You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/16 19:30:26 UTC

svn commit: r964866 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/...

Author: gtully
Date: Fri Jul 16 17:30:25 2010
New Revision: 964866

URL: http://svn.apache.org/viewvc?rev=964866&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2832 - impl for AMQ pa

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java?rev=964866&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java Fri Jul 16 17:30:25 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+
+public final class MessageAckWithLocation extends MessageAck {
+    public final Location location;
+
+    public MessageAckWithLocation(MessageAck ack, Location location) {
+        ack.copy(this);
+        this.location = location;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Fri Jul 16 17:30:25 2010
@@ -47,7 +47,6 @@ public interface MessageStore extends Se
      * 
      * @param context context
      * @param message
-     * @param l 
      * @return a Future to track when this is complete
      * @throws IOException 
      * @throws IOException
@@ -59,7 +58,6 @@ public interface MessageStore extends Se
      * 
      * @param context context
      * @param message
-     * @param l 
      * @return a Future to track when this is complete
      * @throws IOException 
      * @throws IOException

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Fri Jul 16 17:30:25 2010
@@ -38,6 +38,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.kaha.MessageAckWithLocation;
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -70,7 +71,7 @@ public class AMQMessageStore extends Abs
     protected final TaskRunner asyncWriteTask;
     protected CountDownLatch flushLatch;
     private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
-    private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
+    private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
     /** A MessageStore that we can use to retrieve messages quickly. */
     private Map<MessageId, ReferenceData> cpAddedMessageIds;
     private final boolean debug = LOG.isDebugEnabled();
@@ -255,7 +256,7 @@ public class AMQMessageStore extends Abs
             MessageId id = ack.getLastMessageId();
             data = messages.remove(id);
             if (data == null) {
-                messageAcks.add(ack);
+                messageAcks.add(new MessageAckWithLocation(ack, location));
             } else {
                 // message never got written so datafileReference will still exist
                 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
@@ -350,7 +351,7 @@ public class AMQMessageStore extends Abs
      * @throws IOException
      */
     protected Location doAsyncWrite() throws IOException {
-        final List<MessageAck> cpRemovedMessageLocations;
+        final List<MessageAckWithLocation> cpRemovedMessageLocations;
         final List<Location> cpActiveJournalLocations;
         final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
         final Location lastLocation;
@@ -361,7 +362,7 @@ public class AMQMessageStore extends Abs
             cpRemovedMessageLocations = this.messageAcks;
             cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
             this.messages = new LinkedHashMap<MessageId, ReferenceData>();
-            this.messageAcks = new ArrayList<MessageAck>();
+            this.messageAcks = new ArrayList<MessageAckWithLocation>();
             lastLocation = this.lastLocation;
         } finally {
             lock.unlock();
@@ -406,7 +407,7 @@ public class AMQMessageStore extends Abs
                 persitanceAdapter.commitTransaction(context);
                 persitanceAdapter.beginTransaction(context);
                 // Checkpoint the removed messages.
-                for (MessageAck ack : cpRemovedMessageLocations) {
+                for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
                     try {
                         referenceStore.removeMessage(transactionTemplate.getContext(), ack);
                     } catch (Throwable e) {
@@ -576,5 +577,5 @@ public class AMQMessageStore extends Abs
         }
         getReferenceStore().setBatch(messageId);
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Jul 16 17:30:25 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -570,7 +571,6 @@ public class AMQPersistenceAdapter imple
      * 
      * @throws IOException
      * @throws IOException
-     * @throws InvalidLocationException
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
@@ -1051,7 +1051,6 @@ public class AMQPersistenceAdapter imple
     }
 	
 	
-	
 	protected void lock() throws Exception {
         lockLogged = false;
         lockAquired = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Fri Jul 16 17:30:25 2010
@@ -29,6 +29,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.MessageAckWithLocation;
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -203,17 +204,17 @@ public class KahaReferenceStore extends 
     }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
-        removeMessage(ack.getLastMessageId());
-    }
-
-    public void removeMessage(MessageId msgId) throws IOException {  
         lock.lock();
         try {
+            MessageId msgId = ack.getLastMessageId();
             StoreEntry entry = messageContainer.getEntry(msgId);
             if (entry != null) {
                 ReferenceRecord rr = messageContainer.remove(msgId);
                 if (rr != null) {
                     removeInterest(rr);
+                    if (ack instanceof MessageAckWithLocation) {
+                        recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
+                    }
                     dispatchAudit.isDuplicate(msgId);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
@@ -230,12 +231,18 @@ public class KahaReferenceStore extends 
         }
     }
 
+    private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
+        adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
+    }
+
     public void removeAllMessages(ConnectionContext context) throws IOException {
         lock.lock();
         try {
             Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
+            MessageAck ack = new MessageAck();
             for (MessageId id:tmpSet) {
-                removeMessage(id);
+                ack.setLastMessageId(id);
+                removeMessage(null, ack);
             }
             resetBatching();
             messageContainer.clear();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Fri Jul 16 17:30:25 2010
@@ -252,7 +252,44 @@ public class KahaReferenceStoreAdapter e
      * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
      */
     public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
-        return new HashSet<Integer>(recordReferences.keySet());
+        Set inUse = new HashSet<Integer>(recordReferences.keySet());
+
+        Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
+        while (ackReferences.hasNext()) {
+            Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
+            if (!inUse.contains(ackReference.getKey())) {
+                // should we keep this data file
+                for (Integer referencedFileId : ackReference.getValue()) {
+                    if (inUse.contains(referencedFileId)) {
+                        // keep this ack file
+                        inUse.add(ackReference.getKey());
+                        LOG.debug("not removing data file: " + ackReference.getKey()
+                                        + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
+                        break;
+                    }
+                }
+            }
+            if (!inUse.contains(ackReference.getKey())) {
+               ackReferences.remove();
+            }
+        }
+
+        return inUse;
+    }
+
+    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+    public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
+        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
+        if (referenceFileIds == null) {
+            referenceFileIds = new HashSet<Integer>();
+            referenceFileIds.add(Integer.valueOf(messageFileId));
+            ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
+        } else {
+            Integer id = Integer.valueOf(messageFileId);
+            if (!referenceFileIds.contains(id)) {
+                referenceFileIds.add(id);
+            }
+        }
     }
 
     /**
@@ -409,4 +446,6 @@ public class KahaReferenceStoreAdapter e
     public void setIndexLoadFactor(int loadFactor) {
         this.indexLoadFactor = loadFactor;
     }
+
+
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java?rev=964866&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java Fri Jul 16 17:30:25 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+
+
+public class SparseAckReplayAfterStoreCleanupAMQStoreTest extends AMQ2832Test {
+    @Override
+    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+    brokerService.setPersistenceFactory(new AMQPersistenceAdapterFactory());
+        AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) brokerService.getPersistenceFactory();
+        // ensure there are a bunch of data files but multiple entries in each
+        factory.setMaxFileLength(1024 * 12);
+        // speed up the test case, checkpoint an cleanup early and often
+        factory.setCheckpointInterval(500);
+        factory.setCleanupInterval(500);
+        factory.setSyncOnWrite(false);
+        if (!deleteAllOnStart) {
+            factory.setForceRecoverReferenceStore(true);
+        }
+        brokerService.getPersistenceAdapter();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date