You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/08 16:23:23 UTC

svn commit: r961783 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/ope...

Author: gtully
Date: Thu Jul  8 14:23:21 2010
New Revision: 961783

URL: http://svn.apache.org/viewvc?rev=961783&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2800, https://issues.apache.org/activemq/browse/AMQ-2542, https://issues.apache.org/activemq/browse/AMQ-2803 - implement duplicate checker in transport for a failover: reconnect, uses last seqid from store. iimplemented for kahaDB and JDBC

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
    activemq/trunk/activemq-core/src/main/proto/journal-data.proto
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java Thu Jul  8 14:23:21 2010
@@ -16,35 +16,23 @@
  */
 package org.apache.activemq;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.util.BitArrayBin;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LRUCache;
 
 /**
  * Provides basic audit functions for Messages
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class ActiveMQMessageAudit {
+public class ActiveMQMessageAudit extends ActiveMQMessageAuditNoSync {
 
-    public static final int DEFAULT_WINDOW_SIZE = 2048;
-    public static final int MAXIMUM_PRODUCER_COUNT = 64;
-    private int auditDepth;
-    private int maximumNumberOfProducersToTrack;
-    private LRUCache<Object, BitArrayBin> map;
+    private static final long serialVersionUID = 1L;
 
     /**
      * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
      * 64
      */
     public ActiveMQMessageAudit() {
-        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
+        super();
     }
 
     /**
@@ -55,198 +43,41 @@ public class ActiveMQMessageAudit {
      *                the system
      */
     public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
-        this.auditDepth = auditDepth;
-        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
-        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
+        super(auditDepth, maximumNumberOfProducersToTrack);
     }
     
-    /**
-     * @return the auditDepth
-     */
-    public int getAuditDepth() {
-        return auditDepth;
-    }
-
-    /**
-     * @param auditDepth the auditDepth to set
-     */
-    public void setAuditDepth(int auditDepth) {
-        this.auditDepth = auditDepth;
-    }
-
-    /**
-     * @return the maximumNumberOfProducersToTrack
-     */
-    public int getMaximumNumberOfProducersToTrack() {
-        return maximumNumberOfProducersToTrack;
-    }
-
-    /**
-     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
-     */
-    public void setMaximumNumberOfProducersToTrack(
-            int maximumNumberOfProducersToTrack) {
-        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
-        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
-    }
-
-    /**
-     * Checks if this message has been seen before
-     * 
-     * @param message
-     * @return true if the message is a duplicate
-     * @throws JMSException
-     */
-    public boolean isDuplicate(Message message) throws JMSException {
-        return isDuplicate(message.getJMSMessageID());
-    }
-
-    /**
-     * checks whether this messageId has been seen before and adds this
-     * messageId to the list
-     * 
-     * @param id
-     * @return true if the message is a duplicate
-     */
-    public synchronized boolean isDuplicate(String id) {
-        boolean answer = false;
-        String seed = IdGenerator.getSeedFromId(id);
-        if (seed != null) {
-            BitArrayBin bab = map.get(seed);
-            if (bab == null) {
-                bab = new BitArrayBin(auditDepth);
-                map.put(seed, bab);
-            }
-            long index = IdGenerator.getSequenceFromId(id);
-            if (index >= 0) {
-                answer = bab.setBit(index, true);
-            }
+    @Override
+    public boolean isDuplicate(String id) {
+        synchronized (this) {
+            return super.isDuplicate(id);
         }
-        return answer;
     }
 
-    /**
-     * Checks if this message has been seen before
-     * 
-     * @param message
-     * @return true if the message is a duplicate
-     */
-    public boolean isDuplicate(final MessageReference message) {
-        MessageId id = message.getMessageId();
-        return isDuplicate(id);
-    }
-    
-    /**
-     * Checks if this messageId has been seen before
-     * 
-     * @param id
-     * @return true if the message is a duplicate
-     */
-    public synchronized boolean isDuplicate(final MessageId id) {
-        boolean answer = false;
-        
-        if (id != null) {
-            ProducerId pid = id.getProducerId();
-            if (pid != null) {
-                BitArrayBin bab = map.get(pid);
-                if (bab == null) {
-                    bab = new BitArrayBin(auditDepth);
-                    map.put(pid, bab);
-                }
-                answer = bab.setBit(id.getProducerSequenceId(), true);
-            }
+    @Override
+    public boolean isDuplicate(final MessageId id) {
+        synchronized (this) {
+            return super.isDuplicate(id);
         }
-        return answer;
     }
 
-    /**
-     * mark this message as being received
-     * 
-     * @param message
-     */
-    public void rollback(final MessageReference message) {
-        MessageId id = message.getMessageId();
-        rollback(id);
-    }
-    
-    /**
-     * mark this message as being received
-     * 
-     * @param id
-     */
-    public synchronized void rollback(final  MessageId id) {
-        if (id != null) {
-            ProducerId pid = id.getProducerId();
-            if (pid != null) {
-                BitArrayBin bab = map.get(pid);
-                if (bab != null) {
-                    bab.setBit(id.getProducerSequenceId(), false);
-                }
-            }
+    @Override
+    public void rollback(final  MessageId id) {
+        synchronized (this) {
+            super.rollback(id);
         }
     }
     
-    /**
-     * Check the message is in order
-     * @param msg
-     * @return
-     * @throws JMSException
-     */
-    public boolean isInOrder(Message msg) throws JMSException {
-        return isInOrder(msg.getJMSMessageID());
-    }
-    
-    /**
-     * Check the message id is in order
-     * @param id
-     * @return
-     */
-    public synchronized boolean isInOrder(final String id) {
-        boolean answer = true;
-        
-        if (id != null) {
-            String seed = IdGenerator.getSeedFromId(id);
-            if (seed != null) {
-                BitArrayBin bab = map.get(seed);
-                if (bab != null) {
-                    long index = IdGenerator.getSequenceFromId(id);
-                    answer = bab.isInOrder(index);
-                }
-               
-            }
+    @Override
+    public boolean isInOrder(final String id) {
+        synchronized (this) {
+            return super.isInOrder(id);
         }
-        return answer;
     }
     
-    /**
-     * Check the MessageId is in order
-     * @param message 
-     * @return
-     */
-    public synchronized boolean isInOrder(final MessageReference message) {
-        return isInOrder(message.getMessageId());
-    }
-    
-    /**
-     * Check the MessageId is in order
-     * @param id
-     * @return
-     */
-    public synchronized boolean isInOrder(final MessageId id) {
-        boolean answer = false;
-
-        if (id != null) {
-            ProducerId pid = id.getProducerId();
-            if (pid != null) {
-                BitArrayBin bab = map.get(pid);
-                if (bab == null) {
-                    bab = new BitArrayBin(auditDepth);
-                    map.put(pid, bab);
-                }
-                answer = bab.isInOrder(id.getProducerSequenceId());
-
-            }
+    @Override
+    public boolean isInOrder(final MessageId id) {
+        synchronized (this) {
+            return isInOrder(id);
         }
-        return answer;
     }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=961783&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java Thu Jul  8 14:23:21 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.BitArrayBin;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * Provides basic audit functions for Messages without sync
+ * 
+ * @version $Revision$
+ */
+public class ActiveMQMessageAuditNoSync implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final int DEFAULT_WINDOW_SIZE = 2048;
+    public static final int MAXIMUM_PRODUCER_COUNT = 64;
+    private int auditDepth;
+    private int maximumNumberOfProducersToTrack;
+    private LRUCache<Object, BitArrayBin> map;
+
+    /**
+     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
+     * 64
+     */
+    public ActiveMQMessageAuditNoSync() {
+        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
+    }
+
+    /**
+     * Construct a MessageAudit
+     * 
+     * @param auditDepth range of ids to track
+     * @param maximumNumberOfProducersToTrack number of producers expected in
+     *                the system
+     */
+    public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
+        this.auditDepth = auditDepth;
+        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
+        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
+    }
+    
+    /**
+     * @return the auditDepth
+     */
+    public int getAuditDepth() {
+        return auditDepth;
+    }
+
+    /**
+     * @param auditDepth the auditDepth to set
+     */
+    public void setAuditDepth(int auditDepth) {
+        this.auditDepth = auditDepth;
+    }
+
+    /**
+     * @return the maximumNumberOfProducersToTrack
+     */
+    public int getMaximumNumberOfProducersToTrack() {
+        return maximumNumberOfProducersToTrack;
+    }
+
+    /**
+     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
+     */
+    public void setMaximumNumberOfProducersToTrack(
+            int maximumNumberOfProducersToTrack) {
+        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
+    }
+
+    /**
+     * Checks if this message has been seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     * @throws JMSException
+     */
+    public boolean isDuplicate(Message message) throws JMSException {
+        return isDuplicate(message.getJMSMessageID());
+    }
+
+    /**
+     * checks whether this messageId has been seen before and adds this
+     * messageId to the list
+     * 
+     * @param id
+     * @return true if the message is a duplicate
+     */
+    public boolean isDuplicate(String id) {
+        boolean answer = false;
+        String seed = IdGenerator.getSeedFromId(id);
+        if (seed != null) {
+            BitArrayBin bab = map.get(seed);
+            if (bab == null) {
+                bab = new BitArrayBin(auditDepth);
+                map.put(seed, bab);
+            }
+            long index = IdGenerator.getSequenceFromId(id);
+            if (index >= 0) {
+                answer = bab.setBit(index, true);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Checks if this message has been seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     */
+    public boolean isDuplicate(final MessageReference message) {
+        MessageId id = message.getMessageId();
+        return isDuplicate(id);
+    }
+    
+    /**
+     * Checks if this messageId has been seen before
+     * 
+     * @param id
+     * @return true if the message is a duplicate
+     */
+    public boolean isDuplicate(final MessageId id) {
+        boolean answer = false;
+        
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
+                }
+                answer = bab.setBit(id.getProducerSequenceId(), true);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * mark this message as being received
+     * 
+     * @param message
+     */
+    public void rollback(final MessageReference message) {
+        MessageId id = message.getMessageId();
+        rollback(id);
+    }
+    
+    /**
+     * mark this message as being received
+     * 
+     * @param id
+     */
+    public void rollback(final  MessageId id) {
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab != null) {
+                    bab.setBit(id.getProducerSequenceId(), false);
+                }
+            }
+        }
+    }
+    
+    /**
+     * Check the message is in order
+     * @param msg
+     * @return
+     * @throws JMSException
+     */
+    public boolean isInOrder(Message msg) throws JMSException {
+        return isInOrder(msg.getJMSMessageID());
+    }
+    
+    /**
+     * Check the message id is in order
+     * @param id
+     * @return
+     */
+    public boolean isInOrder(final String id) {
+        boolean answer = true;
+        
+        if (id != null) {
+            String seed = IdGenerator.getSeedFromId(id);
+            if (seed != null) {
+                BitArrayBin bab = map.get(seed);
+                if (bab != null) {
+                    long index = IdGenerator.getSequenceFromId(id);
+                    answer = bab.isInOrder(index);
+                }
+               
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * Check the MessageId is in order
+     * @param message 
+     * @return
+     */
+    public boolean isInOrder(final MessageReference message) {
+        return isInOrder(message.getMessageId());
+    }
+    
+    /**
+     * Check the MessageId is in order
+     * @param id
+     * @return
+     */
+    public boolean isInOrder(final MessageId id) {
+        boolean answer = false;
+
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
+                }
+                answer = bab.isInOrder(id.getProducerSequenceId());
+
+            }
+        }
+        return answer;
+    }
+
+    public long getLastSeqId(ProducerId id) {
+        long result = -1;
+        BitArrayBin bab = map.get(id.toString() + ":");
+        if (bab != null) {
+            result = bab.getLastSetIndex();
+        }
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Thu Jul  8 14:23:21 2010
@@ -47,7 +47,7 @@ public class ConnectionContext {
     private ConnectionId connectionId;
     private String clientId;
     private String userName;
-    private boolean haAware;
+    private boolean reconnect;
     private WireFormatInfo wireFormatInfo;
     private Object longTermStoreContext;
     private boolean producerFlowControl = true;
@@ -86,7 +86,7 @@ public class ConnectionContext {
         rc.connectionId = this.connectionId;
         rc.clientId = this.clientId;
         rc.userName = this.userName;
-        rc.haAware = this.haAware;
+        rc.reconnect = this.reconnect;
         rc.wireFormatInfo = this.wireFormatInfo;
         rc.longTermStoreContext = this.longTermStoreContext;
         rc.producerFlowControl = this.producerFlowControl;
@@ -212,12 +212,12 @@ public class ConnectionContext {
         this.clientId = clientId;
     }
 
-    public boolean isHaAware() {
-        return haAware;
+    public boolean isReconnect() {
+        return reconnect;
     }
 
-    public void setHaAware(boolean haAware) {
-        this.haAware = haAware;
+    public void setReconnect(boolean reconnect) {
+        this.reconnect = reconnect;
     }
 
     public WireFormatInfo getWireFormatInfo() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Jul  8 14:23:21 2010
@@ -18,7 +18,10 @@ package org.apache.activemq.broker;
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.state.ProducerState;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Holds internal state in the broker for a MessageProducer
@@ -27,11 +30,13 @@ import org.apache.activemq.state.Produce
  */
 public class ProducerBrokerExchange {
 
+    private static final Log LOG = LogFactory.getLog(ProducerBrokerExchange.class);
     private ConnectionContext connectionContext;
     private Destination regionDestination;
     private Region region;
     private ProducerState producerState;
     private boolean mutable = true;
+    private long lastSendSequenceNumber = -1;
     
     public ProducerBrokerExchange() {
     }
@@ -117,4 +122,25 @@ public class ProducerBrokerExchange {
         this.producerState = producerState;
     }
 
+    /**
+     * Enforce duplicate suppression using info from persistence adapter
+     * @param messageSend
+     * @return false if message should be ignored as a duplicate
+     */
+    public boolean canDispatch(Message messageSend) {
+        boolean canDispatch = true;
+        if (lastSendSequenceNumber > 0) {
+            if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
+                canDispatch = false;
+                LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" 
+                        + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: "  + lastSendSequenceNumber);
+            }
+        }
+        return canDispatch;
+    }
+
+    public void setLastStoredSequenceId(long l) {
+        lastSendSequenceNumber = l;
+        LOG.debug("last stored sequence id set: " + l);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jul  8 14:23:21 2010
@@ -453,7 +453,9 @@ public class TransportConnection impleme
     public Response processMessage(Message messageSend) throws Exception {
         ProducerId producerId = messageSend.getProducerId();
         ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
-        broker.send(producerExchange, messageSend);
+        if (producerExchange.canDispatch(messageSend)) {
+            broker.send(producerExchange, messageSend);
+        }
         return null;
     }
 
@@ -680,6 +682,7 @@ public class TransportConnection impleme
         context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
         context.setUserName(info.getUserName());
         context.setWireFormatInfo(wireFormatInfo);
+        context.setReconnect(info.isFailoverReconnect());
         this.manageable = info.isManageable();
         state.setContext(context);
         state.setConnection(this);
@@ -1249,13 +1252,16 @@ public class TransportConnection impleme
         }
     }
 
-    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
+    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
         ProducerBrokerExchange result = producerExchanges.get(id);
         if (result == null) {
             synchronized (producerExchanges) {
                 result = new ProducerBrokerExchange();
-                TransportConnectionState state = lookupConnectionState(id);
+                TransportConnectionState state = lookupConnectionState(id);              
                 context = state.getContext();
+                if (context.isReconnect()) {
+                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
+                }
                 result.setConnectionContext(context);
                 SessionState ss = state.getSessionState(id.getParentId());
                 if (ss != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jul  8 14:23:21 2010
@@ -374,6 +374,7 @@ public abstract class AbstractRegion imp
                     LOG.warn("Ack for non existent subscription, ack:" + ack);
                     throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
                 } else {
+                    LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
                     return;
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Thu Jul  8 14:23:21 2010
@@ -37,6 +37,7 @@ public class ConnectionInfo extends Base
     protected boolean clientMaster = true;
     protected boolean faultTolerant = false;
     protected transient Object transportContext;
+    private boolean failoverReconnect;
 
     public ConnectionInfo() {
     }
@@ -216,4 +217,15 @@ public class ConnectionInfo extends Base
         this.faultTolerant = faultTolerant;
     }
 
+    /**
+     * @openwire:property version=6 cache=false
+     * @return failoverReconnect true if this is a reconnect
+     */
+    public boolean isFailoverReconnect() {
+        return this.failoverReconnect;
+    }
+
+    public void setFailoverReconnect(boolean failoverReconnect) {
+        this.failoverReconnect = failoverReconnect;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java Thu Jul  8 14:23:21 2010
@@ -86,6 +86,7 @@ public class ConnectionInfoMarshaller ex
         info.setManageable(bs.readBoolean());
         info.setClientMaster(bs.readBoolean());
         info.setFaultTolerant(bs.readBoolean());
+        info.setFailoverReconnect(bs.readBoolean());
 
     }
 
@@ -107,6 +108,7 @@ public class ConnectionInfoMarshaller ex
         bs.writeBoolean(info.isManageable());
         bs.writeBoolean(info.isClientMaster());
         bs.writeBoolean(info.isFaultTolerant());
+        bs.writeBoolean(info.isFailoverReconnect());
 
         return rc + 0;
     }
@@ -131,6 +133,7 @@ public class ConnectionInfoMarshaller ex
         bs.readBoolean();
         bs.readBoolean();
         bs.readBoolean();
+        bs.readBoolean();
 
     }
 
@@ -165,6 +168,7 @@ public class ConnectionInfoMarshaller ex
         info.setManageable(dataIn.readBoolean());
         info.setClientMaster(dataIn.readBoolean());
         info.setFaultTolerant(dataIn.readBoolean());
+        info.setFailoverReconnect(dataIn.readBoolean());
 
     }
 
@@ -186,6 +190,7 @@ public class ConnectionInfoMarshaller ex
         dataOut.writeBoolean(info.isManageable());
         dataOut.writeBoolean(info.isClientMaster());
         dataOut.writeBoolean(info.isFaultTolerant());
+        dataOut.writeBoolean(info.isFailoverReconnect());
 
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Jul  8 14:23:21 2010
@@ -140,6 +140,7 @@ public class ConnectionStateTracker exte
         // Restore the connections.
         for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
             ConnectionState connectionState = iter.next();
+            connectionState.getInfo().setFailoverReconnect(true);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
             }
@@ -156,6 +157,9 @@ public class ConnectionStateTracker exte
         }
         //now flush messages
         for (Message msg:messageCache.values()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("message: " + msg.getMessageId());
+            }
             transport.oneway(msg);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -157,4 +158,13 @@ public interface PersistenceAdapter exte
      * @return disk space used in bytes of 0 if not implemented
      */
     long size();
+
+    /**
+     * return the last stored producer sequenceId for this producer Id
+     * used to suppress duplicate sends on failover reconnect at the transport
+     * when a reconnect occurs
+     * @param id the producerId to find a sequenceId for
+     * @return the last stored sequence id or -1 if no suppression needed
+     */
+    long getLastProducerSequenceId(ProducerId id) throws IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -43,6 +43,7 @@ import org.apache.activemq.command.Journ
 import org.apache.activemq.command.JournalTrace;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
@@ -1117,4 +1118,10 @@ public class AMQPersistenceAdapter imple
 	           + ".DisableLocking",
 	           "false"));
 	}
+
+	
+    public long getLastProducerSequenceId(ProducerId id) {
+        // reference store send has adequate duplicate suppression
+        return -1;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Jul  8 14:23:21 2010
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.Set;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
 
 /**
@@ -60,7 +61,7 @@ public interface JDBCAdapter {
 
     SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
 
-    long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
+    long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
 
     void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
 
@@ -85,4 +86,6 @@ public interface JDBCAdapter {
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
 
     void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
+
+    long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Jul  8 14:23:21 2010
@@ -285,7 +285,7 @@ public class JDBCMessageStore extends Ab
         long result = -1;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            result = adapter.getStoreSequenceId(c, messageId);
+            result = adapter.getStoreSequenceId(c, destination, messageId);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -37,6 +37,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -93,7 +94,7 @@ public class JDBCPersistenceAdapter exte
     
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
-    protected boolean enableAudit=true;
+    protected boolean enableAudit=false;
     protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
     
@@ -245,6 +246,19 @@ public class JDBCPersistenceAdapter exte
             c.close();
         }
     }
+    
+    public long getLastProducerSequenceId(ProducerId id) throws IOException {
+        TransactionContext c = getTransactionContext();
+        try {
+            return getAdapter().doGetLastProducerSequenceId(c, id);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
+        } finally {
+            c.close();
+        }
+    }
+
 
     public void start() throws Exception {
         getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
@@ -699,6 +713,5 @@ public class JDBCPersistenceAdapter exte
         synchronized(sequenceGenerator) {
             return sequenceGenerator.getNextSequenceId();
         }
-    }
-    
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Jul  8 14:23:21 2010
@@ -49,7 +49,7 @@ public class JDBCTopicMessageStore exten
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-        	long seq = adapter.getStoreSequenceId(c, messageId);
+        	long seq = adapter.getStoreSequenceId(c, destination, messageId);
             adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Jul  8 14:23:21 2010
@@ -67,6 +67,7 @@ public class Statements {
     private String findNextMessagesStatement;
     private boolean useLockCreateWhereClause;
     private String findAllMessageIdsStatement;
+    private String lastProducerSequenceIdStatement;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -128,7 +129,7 @@ public class Statements {
     public String getFindMessageSequenceIdStatement() {
         if (findMessageSequenceIdStatement == null) {
             findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
-                                             + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
+                                             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
         }
         return findMessageSequenceIdStatement;
     }
@@ -172,6 +173,15 @@ public class Statements {
         return findLastSequenceIdInMsgsStatement;
     }
 
+    public String getLastProducerSequenceIdStatement() {
+        if (lastProducerSequenceIdStatement == null) {
+            lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName()
+                                            + " WHERE MSGID_PROD=?";
+        }
+        return lastProducerSequenceIdStatement;
+    }
+
+
     public String getFindLastSequenceIdInAcksStatement() {
         if (findLastSequenceIdInAcksStatement == null) {
             findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName();
@@ -656,4 +666,9 @@ public class Statements {
         this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement;
     }
 
+
+    public void setLastProducerSequenceIdStatement(String lastProducerSequenceIdStatement) {
+        this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
+    }
+
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Jul  8 14:23:21 2010
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
@@ -246,13 +247,14 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
+    public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
             s.setString(1, messageID.getProducerId().toString());
             s.setLong(2, messageID.getProducerSequenceId());
+            s.setString(3, destination.getQualifiedName());
             rs = s.executeQuery();
             if (!rs.next()) {
                 return 0;
@@ -819,4 +821,23 @@ public class DefaultJDBCAdapter implemen
      * try { s.close(); } catch (Throwable ignore) {} } }
      */
 
+    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
+            throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
+            s.setString(1, id.toString());
+            rs = s.executeQuery();
+            long seq = -1;
+            if (rs.next()) {
+                seq = rs.getLong(1);
+            }
+            return seq;
+        } finally {
+            close(rs);
+            close(s);
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -50,6 +50,7 @@ import org.apache.activemq.command.Journ
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
@@ -745,4 +746,8 @@ public class JournalPersistenceAdapter i
         }
     }
 
+    public long getLastProducerSequenceId(ProducerId id) {
+        return -1;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -32,6 +32,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
@@ -369,6 +370,11 @@ public class KahaPersistenceAdapter impl
 	public void setBrokerService(BrokerService brokerService) {
 		this.brokerService = brokerService;
 	}
+
+    public long getLastProducerSequenceId(ProducerId id) {
+        // reference store send has adequate duplicate suppression
+        return -1;
+    }
   
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -124,6 +125,10 @@ public class KahaDBPersistenceAdapter im
         return this.letter.getLastMessageBrokerSequenceId();
     }
 
+    public long getLastProducerSequenceId(ProducerId id) throws IOException {
+        return this.letter.getLastProducerSequenceId(id);
+    }
+
     /**
      * @param destination
      * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
@@ -209,6 +214,29 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * Set the max number of producers (LRU cache) to track for duplicate sends
+     */
+    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
+        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
+    }
+    
+    public int getMaxFailoverProducersToTrack() {
+        return this.letter.getMaxFailoverProducersToTrack();
+    }
+
+    /**
+     * set the audit window depth for duplicate suppression (should exceed the max transaction
+     * batch)
+     */
+    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
+        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
+    }
+    
+    public int getFailoverProducersAuditDepth() {
+        return this.getFailoverProducersAuditDepth();
+    }
+    
+    /**
      * Get the checkpointInterval
      * 
      * @return the checkpointInterval
@@ -477,4 +505,5 @@ public class KahaDBPersistenceAdapter im
         String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
         return "KahaDBPersistenceAdapter[" + path + "]";
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Jul  8 14:23:21 2010
@@ -48,6 +48,7 @@ import org.apache.activemq.command.Local
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
@@ -363,6 +364,7 @@ public class KahaDBStore extends Message
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
+            
         }
 
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -901,6 +903,15 @@ public class KahaDBStore extends Message
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
+    
+    public long getLastProducerSequenceId(ProducerId id) {
+        indexLock.readLock().lock();
+        try {
+            return metadata.producerSequenceIdTracker.getLastSeqId(id);
+        } finally {
+            indexLock.readLock().unlock();
+        }
+    }
 
     public long size() {
         if (!isStarted()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jul  8 14:23:21 2010
@@ -16,11 +16,15 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,22 +39,25 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@@ -93,9 +100,9 @@ public class MessageDatabase extends Ser
     private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
     private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
-     static final int CLOSED_STATE = 1;
-     static final int OPEN_STATE = 2;
-     static final long NOT_ACKED = -1;
+    static final int CLOSED_STATE = 1;
+    static final int OPEN_STATE = 2;
+    static final long NOT_ACKED = -1;
 
 
     protected class Metadata {
@@ -104,6 +111,8 @@ public class MessageDatabase extends Ser
         protected BTreeIndex<String, StoredDestination> destinations;
         protected Location lastUpdate;
         protected Location firstInProgressTransactionLocation;
+        protected Location producerSequenceIdTrackerLocation = null;
+        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
 
         public void read(DataInput is) throws IOException {
             state = is.readInt();
@@ -118,6 +127,14 @@ public class MessageDatabase extends Ser
             } else {
                 firstInProgressTransactionLocation = null;
             }
+            try {
+                if (is.readBoolean()) {
+                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
+                } else {
+                    producerSequenceIdTrackerLocation = null;
+                }
+            } catch (EOFException expectedOnUpgrade) {
+            }
         }
 
         public void write(DataOutput os) throws IOException {
@@ -137,6 +154,13 @@ public class MessageDatabase extends Ser
             } else {
                 os.writeBoolean(false);
             }
+            
+            if (producerSequenceIdTrackerLocation != null) {
+                os.writeBoolean(true);
+                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
+            } else {
+                os.writeBoolean(false);
+            }
         }
     }
 
@@ -154,7 +178,7 @@ public class MessageDatabase extends Ser
 
     protected PageFile pageFile;
 	protected Journal journal;
-    protected Metadata metadata = new Metadata();
+	protected Metadata metadata = new Metadata();
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
 
@@ -171,7 +195,8 @@ public class MessageDatabase extends Ser
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
-    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
+    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    
     
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
@@ -381,15 +406,15 @@ public class MessageDatabase extends Ser
     private Location getFirstInProgressTxLocation() {
         Location l = null;
         synchronized (inflightTransactions) {
-        if (!inflightTransactions.isEmpty()) {
-            l = inflightTransactions.values().iterator().next().get(0).getLocation();
-        }
-        if (!preparedTransactions.isEmpty()) {
-            Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
-            if (l==null || t.compareTo(l) <= 0) {
-                l = t;
+            if (!inflightTransactions.isEmpty()) {
+                l = inflightTransactions.values().iterator().next().get(0).getLocation();
+            }
+            if (!preparedTransactions.isEmpty()) {
+                Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
+                if (l==null || t.compareTo(l) <= 0) {
+                    l = t;
+                }
             }
-        }
         }
         return l;
     }
@@ -407,21 +432,25 @@ public class MessageDatabase extends Ser
         try {
             
 	        long start = System.currentTimeMillis();        
-	        Location recoveryPosition = getRecoveryPosition();
-	        if( recoveryPosition!=null ) {
-		        int redoCounter = 0;
-		        LOG.info("Recoverying from the journal ...");
-		        while (recoveryPosition != null) {
-		            JournalCommand message = load(recoveryPosition);
-		            metadata.lastUpdate = recoveryPosition;
-		            process(message, recoveryPosition);
-		            redoCounter++;
-		            recoveryPosition = journal.getNextLocation(recoveryPosition);
-		        }
-		        long end = System.currentTimeMillis();
-	        	LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+	        Location producerAuditPosition = recoverProducerAudit();
+	        Location lastIndoubtPosition = getRecoveryPosition();
+	        
+	        Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+	            
+	        if (recoveryPosition != null) {  
+	            int redoCounter = 0;
+	            LOG.info("Recoverying from the journal ...");
+	            while (recoveryPosition != null) {
+	                JournalCommand<?> message = load(recoveryPosition);
+	                metadata.lastUpdate = recoveryPosition;
+	                process(message, recoveryPosition, lastIndoubtPosition);
+	                redoCounter++;
+	                recoveryPosition = journal.getNextLocation(recoveryPosition);
+	            }
+	            long end = System.currentTimeMillis();
+	            LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
 	        }
-	     
+	        
 	        // We may have to undo some index updates.
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -433,7 +462,39 @@ public class MessageDatabase extends Ser
         }
     }
     
-	protected void recoverIndex(Transaction tx) throws IOException {
+	private Location minimum(Location producerAuditPosition,
+            Location lastIndoubtPosition) {
+	    Location min = null;
+	    if (producerAuditPosition != null) {
+	        min = producerAuditPosition;
+	        if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+	            min = lastIndoubtPosition;
+	        }
+	    } else {
+	        min = lastIndoubtPosition;
+	    }
+	    return min;
+    }
+	
+	private Location recoverProducerAudit() throws IOException {
+	    if (metadata.producerSequenceIdTrackerLocation != null) {
+	        KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
+	        try {
+	            ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
+	            metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
+	        } catch (ClassNotFoundException cfe) {
+	            IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
+	            ioe.initCause(cfe);
+	            throw ioe;
+	        }
+	        return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+	    } else {
+	        // got no audit stored so got to recreate via replay from start of the journal
+	        return journal.getNextLocation(null);
+	    }
+    }
+
+    protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
         // It is possible index updates got applied before the journal updates.. 
         // in that case we need to removed references to messages that are not in the journal
@@ -457,6 +518,7 @@ public class MessageDatabase extends Ser
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 sd.locationIndex.remove(tx, keys.location);
                 sd.messageIdIndex.remove(tx, keys.messageId);
+                metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
                 undoCounter++;
                 // TODO: do we need to modify the ack positions for the pub sub case?
 			}
@@ -588,7 +650,7 @@ public class MessageDatabase extends Ser
 	        while (nextRecoveryPosition != null) {
 	        	lastRecoveryPosition = nextRecoveryPosition;
 	            metadata.lastUpdate = lastRecoveryPosition;
-	            JournalCommand message = load(lastRecoveryPosition);
+	            JournalCommand<?> message = load(lastRecoveryPosition);
 	            process(message, lastRecoveryPosition);            
 	            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
 	        }
@@ -601,8 +663,8 @@ public class MessageDatabase extends Ser
         return metadata.lastUpdate;
     }
     
-	private Location getRecoveryPosition() throws IOException {
-		
+    private Location getRecoveryPosition() throws IOException {
+        
         // If we need to recover the transactions..
         if (metadata.firstInProgressTransactionLocation != null) {
             return metadata.firstInProgressTransactionLocation;
@@ -613,7 +675,7 @@ public class MessageDatabase extends Ser
             // Start replay at the record after the last one recorded in the index file.
             return journal.getNextLocation(metadata.lastUpdate);
         }
-        
+	    
         // This loads the first position.
         return journal.getNextLocation(null);
 	}
@@ -658,7 +720,7 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
-    public Location store(JournalCommand data) throws IOException {
+    public Location store(JournalCommand<?> data) throws IOException {
         return store(data, false, null,null);
     }
 
@@ -669,7 +731,7 @@ public class MessageDatabase extends Ser
      * during a recovery process.
      * @param done 
      */
-    public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
+    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
     	if (before != null) {
     	    before.run();
     	}
@@ -716,7 +778,7 @@ public class MessageDatabase extends Ser
      * @return
      * @throws IOException
      */
-    public JournalCommand load(Location location) throws IOException {
+    public JournalCommand<?> load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
@@ -724,10 +786,30 @@ public class MessageDatabase extends Ser
         if( type == null ) {
             throw new IOException("Could not load journal record. Invalid location: "+location);
         }
-        JournalCommand message = (JournalCommand)type.createMessage();
+        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
         message.mergeFramed(is);
         return message;
     }
+    
+    /**
+     * do minimal recovery till we reach the last inDoubtLocation
+     * @param data
+     * @param location
+     * @param inDoubtlocation
+     * @throws IOException
+     */
+    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
+        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
+            process(data, location);
+        } else {
+            // just recover producer audit
+            data.visit(new Visitor() {
+                public void visit(KahaAddMessageCommand command) throws IOException {
+                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
+                }
+            });
+        }
+    }
 
     // /////////////////////////////////////////////////////////////////
     // Journaled record processing methods. Once the record is journaled,
@@ -735,7 +817,7 @@ public class MessageDatabase extends Ser
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    void process(JournalCommand data, final Location location) throws IOException {
+    void process(JournalCommand<?> data, final Location location) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws IOException {
@@ -911,7 +993,7 @@ public class MessageDatabase extends Ser
         if( previous == null ) {
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
             if( previous == null ) {
-                sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+                sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));            
             } else {
                 // If the message ID as indexed, then the broker asked us to store a DUP
                 // message.  Bad BOY!  Don't do it, and log a warning.
@@ -927,7 +1009,8 @@ public class MessageDatabase extends Ser
             // TODO: consider just rolling back the tx.
             sd.locationIndex.put(tx, location, previous);
         }
-        
+        // record this id in any event, initial send or recovery
+        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
     }
 
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@@ -1025,6 +1108,7 @@ public class MessageDatabase extends Ser
         LOG.debug("Checkpoint started.");
         
         metadata.state = OPEN_STATE;
+        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
         metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
         pageFile.flush();
@@ -1111,6 +1195,15 @@ public class MessageDatabase extends Ser
         LOG.debug("Checkpoint done.");
     }
     
+    private Location checkpointProducerAudit() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oout = new ObjectOutputStream(baos);
+        oout.writeObject(metadata.producerSequenceIdTracker);
+        oout.flush();
+        oout.close();
+        return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())));
+    }
+
     public HashSet<Integer> getJournalFilesBeingReplicated() {
 		return journalFilesBeingReplicated;
 	}
@@ -1580,6 +1673,22 @@ public class MessageDatabase extends Ser
         return journalMaxFileLength;
     }
     
+    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
+        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
+    }
+    
+    public int getMaxFailoverProducersToTrack() {
+        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
+    }
+    
+    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
+        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
+    }
+    
+    public int getFailoverProducersAuditDepth() {
+        return this.metadata.producerSequenceIdTracker.getAuditDepth();
+    }
+    
     public PageFile getPageFile() {
         if (pageFile == null) {
             pageFile = createPageFile();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Thu Jul  8 14:23:21 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
@@ -569,5 +570,9 @@ public class TempKahaDBStore extends Tem
             throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
+    
+    public long getLastProducerSequenceId(ProducerId id) {
+        return -1;
+    }
         
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Thu Jul  8 14:23:21 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@@ -52,5 +53,8 @@ public class Visitor {
 
     public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
     }
+    
+    public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Thu Jul  8 14:23:21 2010
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ProxyMessageStore;
@@ -201,4 +202,9 @@ public class MemoryPersistenceAdapter im
             createTransactionStore();
         }
     }
+
+    public long getLastProducerSequenceId(ProducerId id) {
+        // memory map does duplicate suppression
+        return -1;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jul  8 14:23:21 2010
@@ -727,7 +727,7 @@ public class FailoverTransport implement
         for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
             Command command = iter2.next();
             if (LOG.isTraceEnabled()) {
-                LOG.trace("restore, replay: " + command);
+                LOG.trace("restore requestMap, replay: " + command);
             }
             t.oneway(command);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java Thu Jul  8 14:23:21 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.util;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Simple BitArray to enable setting multiple boolean values efficently Used
@@ -27,7 +28,10 @@ import java.io.IOException;
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class BitArray {
+public class BitArray implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
     static final int LONG_SIZE = 64;
     static final int INT_SIZE = 32;
     static final int SHORT_SIZE = 16;
@@ -113,6 +117,14 @@ public class BitArray {
         this.bits = bits;
     }
 
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+        writeToStream(out);
+    }
+    
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+        readFromStream(in);
+    }
+    
     /**
      * write the bits to an output stream
      * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Thu Jul  8 14:23:21 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.util;
 
+import java.io.Serializable;
 import java.util.LinkedList;
 
 /**
@@ -23,8 +24,9 @@ import java.util.LinkedList;
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class BitArrayBin {
+public class BitArrayBin implements Serializable {
 
+    private static final long serialVersionUID = 1L;
     private LinkedList<BitArray> list;
     private int maxNumberOfArrays;
     private int firstIndex = -1;
@@ -162,4 +164,22 @@ public class BitArrayBin {
         }
         return answer;
     }
+
+    public long getLastSetIndex() {
+        long result = -1;
+        
+        if (firstIndex >=0) {
+            result = firstIndex;   
+            BitArray last = null;
+            for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
+                last = list.get(lastBitArrayIndex);
+                if (last != null) {
+                    result += last.length() -1;
+                    result += lastBitArrayIndex * BitArray.LONG_SIZE;
+                    break;
+                }
+            }
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Thu Jul  8 14:23:21 2010
@@ -29,6 +29,7 @@ enum KahaEntryType {
   KAHA_ROLLBACK_COMMAND = 5;
   KAHA_REMOVE_DESTINATION_COMMAND = 6;
   KAHA_SUBSCRIPTION_COMMAND = 7;
+  KAHA_PRODUCER_AUDIT_COMMAND = 8;
 }
 
 message KahaTraceCommand {
@@ -109,6 +110,18 @@ message KahaSubscriptionCommand {
   optional bytes subscriptionInfo = 4;
 }
 
+message KahaProducerAuditCommand {
+  // We make use of the wonky comment style bellow because the following options
+  // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+  // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  
+  required bytes audit = 1;
+}
+
 message KahaDestination {
   enum DestinationType {
     QUEUE = 0;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java Thu Jul  8 14:23:21 2010
@@ -152,6 +152,7 @@ public class PerDestinationStoreLimitTes
             Thread.sleep(1000);
             // the producer is blocked once the done flag stays true
             if (done.get()) {
+                LOG.info("Blocked....");
                 break;
             }
             done.set(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java Thu Jul  8 14:23:21 2010
@@ -66,5 +66,6 @@ public class ConnectionInfoTest extends 
         info.setManageable(false);
         info.setClientMaster(true);
         info.setFaultTolerant(false);
+        info.setFailoverReconnect(true);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java Thu Jul  8 14:23:21 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -120,6 +121,23 @@ public abstract class StoreOrderTest {
     }
     
     @Test
+    public void testCompositeSendReceiveAfterRestart() throws Exception {
+        destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
+        enqueueOneMessage();
+        
+        LOG.info("restart broker");
+        stopBroker();
+        broker = createRestartedBroker();
+        dumpMessages();
+        initConnection();
+        destination = new ActiveMQQueue("StoreOrderTest");
+        assertNotNull("got one message from first dest", receiveOne());
+        dumpMessages();
+        destination = new ActiveMQQueue("SecondStoreOrderTest");
+        assertNotNull("got one message from second dest", receiveOne());
+    }
+    
+    @Test
     public void validateUnorderedTxCommit() throws Exception {
         
         Executor executor = Executors.newCachedThreadPool();
@@ -247,6 +265,7 @@ public abstract class StoreOrderTest {
         PolicyEntry defaultEntry = new PolicyEntry();
         defaultEntry.setMemoryLimit(1024*3);
         defaultEntry.setCursorMemoryHighWaterMark(68);
+        defaultEntry.setExpireMessagesPeriod(0);
         map.setDefaultEntry(defaultEntry);
         brokerService.setDestinationPolicy(map);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Thu Jul  8 14:23:21 2010
@@ -28,6 +28,11 @@ public class JDBCPersistenceAdapterTest 
     
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        
+        // explicitly enable audit as it is now off by default
+        // due to org.apache.activemq.broker.ProducerBrokerExchange.canDispatch(Message)
+        jdbc.setEnableAudit(true);
+        
         brokerService.setSchedulerSupport(false);
         brokerService.setPersistenceAdapter(jdbc);
         jdbc.setBrokerService(brokerService);
@@ -56,6 +61,5 @@ public class JDBCPersistenceAdapterTest 
     	if (!failed) {
     		fail("Should have failed with audit turned off");
     	}
-    }
-    
+    }   
 }