You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/29 00:13:34 UTC

svn commit: r729803 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java store/jdbc/adapter/DefaultJDBCAdapter.java

Author: rajdavies
Date: Sun Dec 28 15:13:32 2008
New Revision: 729803

URL: http://svn.apache.org/viewvc?rev=729803&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1918

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=729803&r1=729802&r2=729803&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Dec 28 15:13:32 2008
@@ -21,8 +21,6 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.security.KeyManagementException;
-import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,13 +30,9 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
@@ -83,7 +77,6 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=729803&r1=729802&r2=729803&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Sun Dec 28 15:13:32 2008
@@ -23,7 +23,10 @@
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.TreeSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -36,12 +39,10 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Implements all the default JDBC operations that are used by the
- * JDBCPersistenceAdapter. <p/> sub-classing is encouraged to override the
- * default implementation of methods to account for differences in JDBC Driver
- * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using
- * the getBytes()/setBytes() operations. <p/> The databases/JDBC drivers that
- * use this adapter are:
+ * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
+ * encouraged to override the default implementation of methods to account for differences in JDBC Driver
+ * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
+ * The databases/JDBC drivers that use this adapter are:
  * <ul>
  * <li></li>
  * </ul>
@@ -51,10 +52,10 @@
  * @version $Revision: 1.10 $
  */
 public class DefaultJDBCAdapter implements JDBCAdapter {
-
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments = true;
+    private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
         s.setBytes(index, data);
@@ -75,16 +76,15 @@
             boolean alreadyExists = false;
             ResultSet rs = null;
             try {
-                rs = c.getConnection().getMetaData().getTables(null, null,
-                                                               statements.getFullMessageTableName(),
-                                                               new String[] {"TABLE"});
+                rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
+                        new String[] { "TABLE" });
                 alreadyExists = rs.next();
             } catch (Throwable ignore) {
             } finally {
                 close(rs);
             }
             s = c.getConnection().createStatement();
-            String[] createStatments = statements.getCreateSchemaStatements();
+            String[] createStatments = this.statements.getCreateSchemaStatements();
             for (int i = 0; i < createStatments.length; i++) {
                 // This will fail usually since the tables will be
                 // created already.
@@ -93,13 +93,13 @@
                     s.execute(createStatments[i]);
                 } catch (SQLException e) {
                     if (alreadyExists) {
-                        LOG.debug("Could not create JDBC tables; The message table already existed."
-                                  + " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
-                                  + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+                        LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
+                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+                                + " Vendor code: " + e.getErrorCode());
                     } else {
                         LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
-                                 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: "
-                                 + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+                                + " Vendor code: " + e.getErrorCode());
                         JDBCPersistenceAdapter.log("Failure details: ", e);
                     }
                 }
@@ -117,7 +117,7 @@
         Statement s = null;
         try {
             s = c.getConnection().createStatement();
-            String[] dropStatments = statements.getDropSchemaStatements();
+            String[] dropStatments = this.statements.getDropSchemaStatements();
             for (int i = 0; i < dropStatments.length; i++) {
                 // This will fail usually since the tables will be
                 // created already.
@@ -125,9 +125,9 @@
                     LOG.debug("Executing SQL: " + dropStatments[i]);
                     s.execute(dropStatments[i]);
                 } catch (SQLException e) {
-                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
-                             + dropStatments[i] + " Message: " + e.getMessage() + " SQLState: "
-                             + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
+                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
+                            + e.getErrorCode());
                     JDBCPersistenceAdapter.log("Failure details: ", e);
                 }
             }
@@ -144,7 +144,7 @@
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
             rs = s.executeQuery();
             long seq1 = 0;
             if (rs.next()) {
@@ -152,7 +152,7 @@
             }
             rs.close();
             s.close();
-            s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
             rs = s.executeQuery();
             long seq2 = 0;
             if (rs.next()) {
@@ -165,13 +165,13 @@
         }
     }
 
-    public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
-                             byte[] data, long expiration) throws SQLException, IOException {
+    public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
+            long expiration) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
-                if (batchStatments) {
+                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
+                if (this.batchStatments) {
                     c.setAddMessageStatement(s);
                 }
             }
@@ -181,28 +181,27 @@
             s.setString(4, destination.getQualifiedName());
             s.setLong(5, expiration);
             setBinaryData(s, 6, data);
-            if (batchStatments) {
+            if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
                 throw new SQLException("Failed add a message");
             }
         } finally {
-            if (!batchStatments) {
-                if (s!=null) {
+            if (!this.batchStatments) {
+                if (s != null) {
                     s.close();
                 }
             }
         }
     }
 
-    public void doAddMessageReference(TransactionContext c, MessageId messageID,
-                                      ActiveMQDestination destination, long expirationTime, String messageRef)
-        throws SQLException, IOException {
+    public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
+            long expirationTime, String messageRef) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
-                if (batchStatments) {
+                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
+                if (this.batchStatments) {
                     c.setAddMessageStatement(s);
                 }
             }
@@ -212,24 +211,23 @@
             s.setString(4, destination.getQualifiedName());
             s.setLong(5, expirationTime);
             s.setString(6, messageRef);
-            if (batchStatments) {
+            if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
                 throw new SQLException("Failed add a message");
             }
         } finally {
-            if (!batchStatments) {
+            if (!this.batchStatments) {
                 s.close();
             }
         }
     }
 
-    public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
-        IOException {
+    public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
             s.setString(1, messageID.getProducerId().toString());
             s.setLong(2, messageID.getProducerSequenceId());
             rs = s.executeQuery();
@@ -247,7 +245,7 @@
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setLong(1, seq);
             rs = s.executeQuery();
             if (!rs.next()) {
@@ -264,7 +262,7 @@
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
             s.setLong(1, seq);
             rs = s.executeQuery();
             if (!rs.next()) {
@@ -281,33 +279,33 @@
         PreparedStatement s = c.getRemovedMessageStatement();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
-                if (batchStatments) {
+                s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment());
+                if (this.batchStatments) {
                     c.setRemovedMessageStatement(s);
                 }
             }
             s.setLong(1, seq);
-            if (batchStatments) {
+            if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
                 throw new SQLException("Failed to remove message");
             }
         } finally {
-            if (!batchStatments) {
+            if (!this.batchStatments) {
                 s.close();
             }
         }
     }
 
-    public void doRecover(TransactionContext c, ActiveMQDestination destination,
-                          JDBCMessageRecoveryListener listener) throws Exception {
+    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
+            throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
             s.setString(1, destination.getQualifiedName());
             rs = s.executeQuery();
-            if (statements.isUseExternalMessageReferences()) {
+            if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next()) {
                     if (!listener.recoverMessageReference(rs.getString(2))) {
                         break;
@@ -327,12 +325,12 @@
     }
 
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
-                             String subscriptionName, long seq) throws SQLException, IOException {
+            String subscriptionName, long seq) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
-                if (batchStatments) {
+                s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
+                if (this.batchStatments) {
                     c.setUpdateLastAckStatement(s);
                 }
             }
@@ -340,32 +338,31 @@
             s.setString(2, destination.getQualifiedName());
             s.setString(3, clientId);
             s.setString(4, subscriptionName);
-            if (batchStatments) {
+            if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
                 throw new SQLException("Failed add a message");
             }
         } finally {
-            if (!batchStatments) {
+            if (!this.batchStatments) {
                 s.close();
             }
         }
     }
 
     public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
-                                      String subscriptionName, JDBCMessageRecoveryListener listener)
-        throws Exception {
+            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
         // dumpTables(c,
         // destination.getQualifiedName(),clientId,subscriptionName);
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
             rs = s.executeQuery();
-            if (statements.isUseExternalMessageReferences()) {
+            if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next()) {
                     if (!listener.recoverMessageReference(rs.getString(2))) {
                         break;
@@ -385,12 +382,11 @@
     }
 
     public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
-                                      String subscriptionName, long seq, int maxReturned,
-                                      JDBCMessageRecoveryListener listener) throws Exception {
+            String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
             s.setMaxRows(maxReturned);
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
@@ -398,7 +394,7 @@
             s.setLong(4, seq);
             rs = s.executeQuery();
             int count = 0;
-            if (statements.isUseExternalMessageReferences()) {
+            if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
@@ -422,13 +418,12 @@
     }
 
     public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
-                                                  String clientId, String subscriptionName)
-        throws SQLException, IOException {
+            String clientId, String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
         try {
-            s = c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
+            s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -444,18 +439,23 @@
     }
 
     /**
-     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection,
-     *      java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
+     * @param c 
+     * @param info 
+     * @param retroactive 
+     * @throws SQLException 
+     * @throws IOException 
+     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
+     *      org.apache.activemq.service.SubscriptionInfo)
      */
     public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
-        throws SQLException, IOException {
+            throws SQLException, IOException {
         // dumpTables(c, destination.getQualifiedName(), clientId,
         // subscriptionName);
         PreparedStatement s = null;
         try {
             long lastMessageId = -1;
             if (!retroactive) {
-                s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
                 ResultSet rs = null;
                 try {
                     rs = s.executeQuery();
@@ -467,7 +467,7 @@
                     close(s);
                 }
             }
-            s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
+            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
             s.setString(1, info.getDestination().getQualifiedName());
             s.setString(2, info.getClientId());
             s.setString(3, info.getSubscriptionName());
@@ -483,12 +483,11 @@
     }
 
     public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
-                                                 String clientId, String subscriptionName)
-        throws SQLException, IOException {
+            String clientId, String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -501,8 +500,8 @@
             subscription.setClientId(clientId);
             subscription.setSubscriptionName(subscriptionName);
             subscription.setSelector(rs.getString(1));
-            subscription.setSubscribedDestination(ActiveMQDestination
-                .createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
+            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
+                    ActiveMQDestination.QUEUE_TYPE));
             return subscription;
         } finally {
             close(rs);
@@ -511,11 +510,11 @@
     }
 
     public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
-        throws SQLException, IOException {
+            throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
             s.setString(1, destination.getQualifiedName());
             rs = s.executeQuery();
             ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
@@ -525,8 +524,8 @@
                 subscription.setSelector(rs.getString(1));
                 subscription.setSubscriptionName(rs.getString(2));
                 subscription.setClientId(rs.getString(3));
-                subscription.setSubscribedDestination(ActiveMQDestination
-                    .createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE));
+                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
+                        ActiveMQDestination.QUEUE_TYPE));
                 rc.add(subscription);
             }
             return rc.toArray(new SubscriptionInfo[rc.size()]);
@@ -536,15 +535,15 @@
         }
     }
 
-    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
-        throws SQLException, IOException {
+    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
+            IOException {
         PreparedStatement s = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
+            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
             s.setString(1, destinationName.getQualifiedName());
             s.executeUpdate();
             s.close();
-            s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
+            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
             s.setString(1, destinationName.getQualifiedName());
             s.executeUpdate();
         } finally {
@@ -553,10 +552,10 @@
     }
 
     public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
-                                     String subscriptionName) throws SQLException, IOException {
+            String subscriptionName) throws SQLException, IOException {
         PreparedStatement s = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
+            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -569,8 +568,8 @@
     public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
         PreparedStatement s = null;
         try {
-            LOG.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement());
-            s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
+            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
+            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
             s.setLong(1, System.currentTimeMillis());
             int i = s.executeUpdate();
             LOG.debug("Deleted " + i + " old message(s).");
@@ -579,16 +578,13 @@
         }
     }
 
-    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
-                                                         ActiveMQDestination destination, String clientId,
-                                                         String subscriberName) throws SQLException,
-        IOException {
+    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
+            String clientId, String subscriberName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         long result = -1;
         try {
-            s = c.getConnection()
-                .prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
+            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriberName);
@@ -624,7 +620,7 @@
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
             rs = s.executeQuery();
             while (rs.next()) {
                 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
@@ -636,34 +632,50 @@
         return rc;
     }
 
+    /**
+     * @return true if batchStements
+     */
     public boolean isBatchStatments() {
-        return batchStatments;
+        return this.batchStatments;
     }
 
+    /**
+     * @param batchStatments
+     */
     public void setBatchStatments(boolean batchStatments) {
         this.batchStatments = batchStatments;
     }
 
     public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
-        statements.setUseExternalMessageReferences(useExternalMessageReferences);
+        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
     }
 
+    /**
+     * @return the statements
+     */
     public Statements getStatements() {
-        return statements;
+        return this.statements;
     }
 
     public void setStatements(Statements statements) {
         this.statements = statements;
     }
 
-    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,
-                                                             ActiveMQDestination destination,
-                                                             String clientId, String subscriberName)
-        throws SQLException, IOException {
+    /**
+     * @param c
+     * @param destination
+     * @param clientId
+     * @param subscriberName
+     * @return
+     * @throws SQLException
+     * @throws IOException
+     */
+    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
+            String clientId, String subscriberName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+            s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriberName);
@@ -679,12 +691,12 @@
     }
 
     public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
-        IOException {
+            IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
         try {
-            s = c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
+            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
             s.setString(1, destination.getQualifiedName());
             rs = s.executeQuery();
             if (rs.next()) {
@@ -698,35 +710,56 @@
     }
 
     public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
-                                      int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+            int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
+        long id = 0;
+        List<Long> cleanupIds = new ArrayList<Long>();
+        int index = 0;
         try {
-            s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
-            s.setMaxRows(maxReturned);
+            s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+            s.setMaxRows(maxReturned * 2);
             s.setString(1, destination.getQualifiedName());
-            s.setLong(2, nextSeq);
+            s.setLong(2, nextSeq - maxReturned);
             rs = s.executeQuery();
             int count = 0;
-            if (statements.isUseExternalMessageReferences()) {
+            if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next() && count < maxReturned) {
+                    id = rs.getLong(1);
+                    if (this.lastRecoveredMessagesIds.contains(id)) {
+                        // this message was already recovered
+                        cleanupIds.add(id);
+                        continue;
+                    }
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
+                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
-                        break;
                     }
                 }
             } else {
                 while (rs.next() && count < maxReturned) {
+                    id = rs.getLong(1);
+                    if (this.lastRecoveredMessagesIds.contains(id)) {
+                        // this message was already recovered
+                        cleanupIds.add(id);
+                        continue;
+                    }
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
+                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
-                        break;
                     }
                 }
             }
+            // not cleanup the list of recovered messages
+            index = 0;
+            Iterator<Long> it = cleanupIds.iterator();
+            while (it.hasNext() && index < count) {
+                this.lastRecoveredMessagesIds.remove(it.next());
+            }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
@@ -735,35 +768,26 @@
         }
     }
     /*
-     * Useful for debugging. public void dumpTables(Connection c, String
-     * destinationName, String clientId, String subscriptionName) throws
-     * SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
-     * printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
-     * PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID
-     * FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND
-     * D.CLIENT_ID=? AND D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID >
-     * D.LAST_ACKED_ID" +" ORDER BY M.ID"); s.setString(1,destinationName);
-     * s.setString(2,clientId); s.setString(3,subscriptionName);
+     * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
+     * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
+     * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
+     * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
+     * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
+     * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
      * printQuery(s,System.out); }
      * 
-     * public void dumpTables(Connection c) throws SQLException { printQuery(c,
-     * "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from
-     * ACTIVEMQ_ACKS", System.out); }
+     * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
+     * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
      * 
-     * private void printQuery(Connection c, String query, PrintStream out)
-     * throws SQLException { printQuery(c.prepareStatement(query), out); }
+     * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
+     * printQuery(c.prepareStatement(query), out); }
      * 
-     * private void printQuery(PreparedStatement s, PrintStream out) throws
-     * SQLException {
+     * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
      * 
-     * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData
-     * metaData = set.getMetaData(); for( int i=1; i<=
-     * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||");
-     * out.print(metaData.getColumnName(i)+"||"); } out.println();
-     * while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) {
-     * if(i==1) out.print("|"); out.print(set.getString(i)+"|"); }
-     * out.println(); } } finally { try { set.close(); } catch (Throwable
-     * ignore) {} try { s.close(); } catch (Throwable ignore) {} } }
+     * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
+     * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
+     * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
+     * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
+     * try { s.close(); } catch (Throwable ignore) {} } }
      */
-
 }