You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/10/15 21:31:24 UTC

svn commit: r584863 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/impl/async/ store/ store/amq/ store/kahadaptor/

Author: rajdavies
Date: Mon Oct 15 12:31:22 2007
New Revision: 584863

URL: http://svn.apache.org/viewvc?rev=584863&view=rev
Log:
fix data logs not being correctly removed

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon Oct 15 12:31:22 2007
@@ -270,6 +270,12 @@
         storeSize.addAndGet(size);
         return currentWriteFile;
     }
+    
+    public synchronized void removeLocation(Location location) throws IOException{
+       
+        DataFile dataFile = getDataFile(location);
+        dataFile.decrement();
+    }
 
     DataFile getDataFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
@@ -346,6 +352,7 @@
     synchronized void addInterestInFile(DataFile dataFile) {
         if (dataFile != null) {
             dataFile.increment();
+            System.err.println("ADD INTEREST: " + dataFile);
         }
     }
 
@@ -355,6 +362,7 @@
             DataFile dataFile = (DataFile)fileMap.get(key);
             removeInterestInFile(dataFile);
         }
+       
     }
 
     synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
@@ -362,24 +370,20 @@
             if (dataFile.decrement() <= 0) {
                 removeDataFile(dataFile);
             }
+            System.err.println("REMOVE INTEREST: " + dataFile);
         }
     }
 
     public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse) throws IOException {
-
-        // Substract and the difference is the set of files that are no longer
-        // needed :)
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
-
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
             DataFile dataFile = (DataFile)fileMap.get(key);
             purgeList.add(dataFile);
         }
-
         for (DataFile dataFile : purgeList) {
-            removeDataFile(dataFile);
+            forceRemoveDataFile(dataFile);
         }
     }
 
@@ -399,16 +403,20 @@
 
         // Make sure we don't delete too much data.
         if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
-            return;
+            LOG.debug("Won't remove DataFile" + dataFile);
+        	return;
         }
-
+        forceRemoveDataFile(dataFile);
+    }
+    
+    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
-
-        fileMap.remove(dataFile.getDataFileId());
+        DataFile removed = fileMap.remove(dataFile.getDataFileId());
         storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
         boolean result = dataFile.delete();
-        LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
+        LOG.debug("discarding data file " + dataFile
+                + (result ? "successful " : "failed"));
 
     }
 
@@ -519,7 +527,8 @@
     }
 
     public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
-        return appender.storeItem(data, Location.USER_TYPE, sync);
+        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+        return loc;
     }
 
     public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Mon Oct 15 12:31:22 2007
@@ -66,6 +66,10 @@
     public synchronized int decrement() {
         return --referenceCount;
     }
+    
+    public synchronized int getReferenceCount(){
+    	return referenceCount;
+    }
 
     public synchronized boolean isUnused() {
         return referenceCount <= 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Mon Oct 15 12:31:22 2007
@@ -59,9 +59,8 @@
             return rc;
         }
 
-        public void closeDataFileReader(DataFileAccessor reader) {
+        public synchronized void closeDataFileReader(DataFileAccessor reader) {
             openCounter--;
-            used = true;
             if (pool.size() >= maxOpenReadersPerFile || disposed) {
                 reader.dispose();
             } else {
@@ -69,15 +68,15 @@
             }
         }
 
-        public void clearUsedMark() {
+        public synchronized void clearUsedMark() {
             used = false;
         }
 
-        public boolean isUsed() {
+        public synchronized boolean isUsed() {
             return used;
         }
 
-        public void dispose() {
+        public synchronized void dispose() {
             for (DataFileAccessor reader : pool) {
                 reader.dispose();
             }
@@ -85,7 +84,7 @@
             disposed = true;
         }
 
-        public int getOpenCounter() {
+        public synchronized int getOpenCounter() {
             return openCounter;
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Oct 15 12:31:22 2007
@@ -17,9 +17,10 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -31,17 +31,20 @@
  */
 public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
     /**
-     * Stores the last acknowledged messgeID for the given subscription so that
+     * Removes the last acknowledged messgeID for the given subscription so that
      * we can recover and commence dispatching messages from the last checkpoint
+     * N.B. - all messages previous to this one for a given subscriber
+     * should also be acknowledged
      * 
      * @param context
      * @param clientId
      * @param subscriptionName
      * @param messageId
      * @param subscriptionPersistentId
+     * @return true if there are no more references to the message - or the message is null
      * @throws IOException
      */
-    void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
+    boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
 
     /**
      * @param clientId

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Oct 15 12:31:22 2007
@@ -180,7 +180,7 @@
 
     /**
      */
-    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+    public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
         JournalQueueAck remove = new JournalQueueAck();
         remove.setDestination(destination);
         remove.setMessageAck(ack);
@@ -189,7 +189,7 @@
             if (debug) {
                 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
             }
-            removeMessage(ack, location);
+            removeMessage(ack,location);
         } else {
             if (debug) {
                 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@@ -206,7 +206,7 @@
                     }
                     synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        removeMessage(ack, location);
+                        removeMessage(ack,location);
                     }
                 }
 
@@ -240,7 +240,7 @@
             }
         }
     }
-
+      
     public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
         try {
             // Only remove the message if it has not already been removed.
@@ -378,16 +378,28 @@
      * 
      */
     public Message getMessage(MessageId identity) throws IOException {
+        Location location = getLocation(identity);
+        DataStructure rc = peristenceAdapter.readCommand(location);
+        try {
+            return (Message) rc;
+        } catch (ClassCastException e) {
+            throw new IOException("Could not read message " + identity
+                    + " at location " + location
+                    + ", expected a message, but got: " + rc);
+        }
+    }
+    
+    protected Location getLocation(MessageId messageId) throws IOException {
         ReferenceData data = null;
         synchronized (this) {
             // Is it still in flight???
-            data = messages.get(identity);
+            data = messages.get(messageId);
             if (data == null && cpAddedMessageIds != null) {
-                data = cpAddedMessageIds.get(identity);
+                data = cpAddedMessageIds.get(messageId);
             }
         }
         if (data == null) {
-            data = referenceStore.getMessageReference(identity);
+            data = referenceStore.getMessageReference(messageId);
             if (data == null) {
                 return null;
             }
@@ -395,12 +407,7 @@
         Location location = new Location();
         location.setDataFileId(data.getFileId());
         location.setOffset(data.getOffset());
-        DataStructure rc = peristenceAdapter.readCommand(location);
-        try {
-            return (Message)rc;
-        } catch (ClassCastException e) {
-            throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
-        }
+        return location;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon Oct 15 12:31:22 2007
@@ -83,7 +83,7 @@
     private TaskRunnerFactory taskRunnerFactory;
     private WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
-    private long cleanupInterval = 1000 * 60;
+    private long cleanupInterval = 1000 * 15;
     private long checkpointInterval = 1000 * 10;
     private int maxCheckpointWorkers = 1;
     private int maxCheckpointMessageAddSize = 1024 * 4;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Mon Oct 15 12:31:22 2007
@@ -25,6 +25,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.impl.async.Location;
@@ -77,7 +78,7 @@
 
     /**
      */
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+    public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
         final boolean debug = LOG.isDebugEnabled();
         JournalTopicAck ack = new JournalTopicAck();
         ack.setDestination(destination);
@@ -92,7 +93,7 @@
             if (debug) {
                 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
             }
-            acknowledge(messageId, location, key);
+            acknowledge(context,messageId, location, clientId,subscriptionName);
         } else {
             if (debug) {
                 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
@@ -109,7 +110,7 @@
                     }
                     synchronized (AMQTopicMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        acknowledge(messageId, location, key);
+                        acknowledge(context,messageId, location, clientId,subscriptionName);
                     }
                 }
 
@@ -142,44 +143,22 @@
      * @param messageId
      * @param location
      * @param key
-     * @throws InterruptedIOException
+     * @throws IOException 
      */
-    protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
+    protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
         synchronized (this) {
             lastLocation = location;
-            ackedLastAckLocations.put(key, messageId);
+            if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
+                MessageAck ack = new MessageAck();
+                ack.setLastMessageId(messageId);
+                removeMessage(context, ack);
+            }
         }
         try {
             asyncWriteTask.wakeup();
         } catch (InterruptedException e) {
             throw new InterruptedIOException();
         }
-    }
-
-    @Override
-    protected Location doAsyncWrite() throws IOException {
-        final Map<SubscriptionKey, MessageId> cpAckedLastAckLocations;
-        // swap out the hash maps..
-        synchronized (this) {
-            cpAckedLastAckLocations = this.ackedLastAckLocations;
-            this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
-        }
-        Location location = super.doAsyncWrite();
-
-        if (cpAckedLastAckLocations != null) {
-            transactionTemplate.run(new Callback() {
-                public void execute() throws Exception {
-                    // Checkpoint the acknowledged messages.
-                    Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
-                    while (iterator.hasNext()) {
-                        SubscriptionKey subscriptionKey = iterator.next();
-                        MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
-                        topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
-                    }
-                }
-            });
-        }
-        return location;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -26,6 +26,7 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -120,7 +121,7 @@
         }
         return result.getData();
     }
-
+    
     public void addReferenceFileIdsInUse() {
         for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
             .getNext(entry)) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -117,12 +117,55 @@
         return container;
     }
 
-    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                                         MessageId messageId) throws IOException {
+    public synchronized boolean acknowledgeReference(ConnectionContext context,
+            String clientId, String subscriptionName, MessageId messageId)
+            throws IOException {
+        boolean removeMessage = false;
         String key = getSubscriptionKey(clientId, subscriptionName);
 
         TopicSubContainer container = subscriberMessages.get(key);
         if (container != null) {
+            ConsumerMessageRef ref = null;
+            if((ref = container.remove(messageId)) != null) {
+                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                if (tsa != null) {
+                    if (tsa.decrementCount() <= 0) {
+                        StoreEntry entry = ref.getAckEntry();
+                        entry = ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        ReferenceRecord rr = messageContainer.get(messageId);
+                        if (rr != null) {
+                            entry = tsa.getMessageEntry();
+                            entry = messageContainer.refresh(entry);
+                            messageContainer.remove(entry);
+                            removeInterest(rr);
+                            removeMessage = true;
+                        }else {
+                            System.err.println("REF RTEC OS NULL!!!");
+                        }
+                    } else {
+                        System.out.println("RED XOUVT IAS " + tsa.getCount());
+                        ackContainer.update(ref.getAckEntry(), tsa);
+                    }
+                }else{
+                    System.err.println("NO TAS!!!");
+                }
+            }else{
+                //no message held
+                removeMessage = true;
+            }
+        }
+        return removeMessage;
+
+    }
+    
+    public synchronized void acknowledge(ConnectionContext context,
+			String clientId, String subscriptionName, MessageId messageId)
+			throws IOException {
+		String key = getSubscriptionKey(clientId, subscriptionName);
+
+		TopicSubContainer container = subscriberMessages.get(key);
+		if (container != null) {
             ConsumerMessageRef ref = container.remove(messageId);
             if (ref != null) {
                 TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
@@ -145,7 +188,7 @@
                 }
             }
         }
-    }
+	}
 
     public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
         String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Mon Oct 15 12:31:22 2007
@@ -81,6 +81,22 @@
         }
         return result;
     }
+    
+    public ConsumerMessageRef removeFirst() {
+		ConsumerMessageRef result = null;
+		if (!listContainer.isEmpty()) {
+			StoreEntry entry = listContainer.getFirst();
+
+			result = (ConsumerMessageRef) listContainer.get(entry);
+			listContainer.remove(entry);
+			if (listContainer != null && batchEntry != null
+					&& (listContainer.isEmpty() || batchEntry.equals(entry))) {
+				reset();
+			}
+
+		}
+		return result;
+	}
 
     public ConsumerMessageRef get(StoreEntry entry) {
         return (ConsumerMessageRef)listContainer.get(entry);