You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/07 19:53:55 UTC

svn commit: r822813 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-core/src/test/java/org/apache/activemq/store/ activemq-...

Author: chirino
Date: Wed Oct  7 17:53:55 2009
New Revision: 822813

URL: http://svn.apache.org/viewvc?rev=822813&view=rev
Log:
AMQ-2439: KahaDB + Network of Brokers + Restart = Duplicate Messages that cannot be removed from the data store
 - merges in 821090 821103 821115

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
      - copied, changed from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
      - copied unchanged from r821115, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
      - copied unchanged from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
Modified:
    activemq/branches/activemq-5.3/   (props changed)
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java   (props changed)

Propchange: activemq/branches/activemq-5.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct  7 17:53:55 2009
@@ -1 +1 @@
-/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764
+/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=822813&r1=822812&r2=822813&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Oct  7 17:53:55 2009
@@ -22,7 +22,17 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -49,13 +59,22 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.*;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
 
 public class MessageDatabase {
 
@@ -808,12 +827,22 @@
         long id = sd.nextMessageId++;
         Long previous = sd.locationIndex.put(tx, location, id);
         if( previous == null ) {
-            sd.messageIdIndex.put(tx, command.getMessageId(), id);
-            sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
+            if( previous == null ) {
+                sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+            } else {
+                // If the message ID as indexed, then the broker asked us to store a DUP
+                // message.  Bad BOY!  Don't do it, and log a warning.
+                LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
+                // TODO: consider just rolling back the tx.
+                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
+            }
         } else {
             // restore the previous value.. Looks like this was a redo of a previously
-            // added message.  We don't want to assing it a new id as the other indexes would 
+            // added message.  We don't want to assign it a new id as the other indexes would 
             // be wrong..
+            //
+            // TODO: consider just rolling back the tx.
             sd.locationIndex.put(tx, location, previous);
         }
         

Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct  7 17:53:55 2009
@@ -1 +1 @@
-/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764
+/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115

Copied: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?p2=activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java&r1=821103&r2=822813&rev=822813&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java Wed Oct  7 17:53:55 2009
@@ -54,15 +54,21 @@
 
         
         MessageStore ms = pa.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        ConnectionContext context = new ConnectionContext();
 
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setText("test");
-        message.setMessageId(new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1"));
-        ConnectionContext context = new ConnectionContext();
-
+        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1");
+        messageId.setBrokerSequenceId(1);
+        message.setMessageId(messageId);
         ms.addMessage(context, message);
 
         // here comes the dup...
+        message = new ActiveMQTextMessage();
+        message.setText("test");
+        messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1");
+        messageId.setBrokerSequenceId(2);
+        message.setMessageId(messageId);
         ms.addMessage(context, message);
 
         final AtomicInteger recovered = new AtomicInteger();
@@ -85,7 +91,6 @@
                 return true;
             }
         });
-        
         assertEquals(1, recovered.get());
 
     }