You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/29 00:13:34 UTC
svn commit: r729803 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/BrokerService.java store/jdbc/adapter/DefaultJDBCAdapter.java
Author: rajdavies
Date: Sun Dec 28 15:13:32 2008
New Revision: 729803
URL: http://svn.apache.org/viewvc?rev=729803&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1918
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=729803&r1=729802&r2=729803&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Dec 28 15:13:32 2008
@@ -21,8 +21,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.security.KeyManagementException;
-import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,13 +30,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.TrustManager;
-
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@@ -83,7 +77,6 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=729803&r1=729802&r2=729803&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Sun Dec 28 15:13:32 2008
@@ -23,7 +23,10 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.TreeSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -36,12 +39,10 @@
import org.apache.commons.logging.LogFactory;
/**
- * Implements all the default JDBC operations that are used by the
- * JDBCPersistenceAdapter. <p/> sub-classing is encouraged to override the
- * default implementation of methods to account for differences in JDBC Driver
- * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using
- * the getBytes()/setBytes() operations. <p/> The databases/JDBC drivers that
- * use this adapter are:
+ * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
+ * encouraged to override the default implementation of methods to account for differences in JDBC Driver
+ * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
+ * The databases/JDBC drivers that use this adapter are:
* <ul>
* <li></li>
* </ul>
@@ -51,10 +52,10 @@
* @version $Revision: 1.10 $
*/
public class DefaultJDBCAdapter implements JDBCAdapter {
-
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
protected boolean batchStatments = true;
+ private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@@ -75,16 +76,15 @@
boolean alreadyExists = false;
ResultSet rs = null;
try {
- rs = c.getConnection().getMetaData().getTables(null, null,
- statements.getFullMessageTableName(),
- new String[] {"TABLE"});
+ rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
+ new String[] { "TABLE" });
alreadyExists = rs.next();
} catch (Throwable ignore) {
} finally {
close(rs);
}
s = c.getConnection().createStatement();
- String[] createStatments = statements.getCreateSchemaStatements();
+ String[] createStatments = this.statements.getCreateSchemaStatements();
for (int i = 0; i < createStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
@@ -93,13 +93,13 @@
s.execute(createStatments[i]);
} catch (SQLException e) {
if (alreadyExists) {
- LOG.debug("Could not create JDBC tables; The message table already existed."
- + " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
- + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
+ + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ + " Vendor code: " + e.getErrorCode());
} else {
LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
- + createStatments[i] + " Message: " + e.getMessage() + " SQLState: "
- + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ + " Vendor code: " + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
@@ -117,7 +117,7 @@
Statement s = null;
try {
s = c.getConnection().createStatement();
- String[] dropStatments = statements.getDropSchemaStatements();
+ String[] dropStatments = this.statements.getDropSchemaStatements();
for (int i = 0; i < dropStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
@@ -125,9 +125,9 @@
LOG.debug("Executing SQL: " + dropStatments[i]);
s.execute(dropStatments[i]);
} catch (SQLException e) {
- LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
- + dropStatments[i] + " Message: " + e.getMessage() + " SQLState: "
- + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
+ + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
+ + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
@@ -144,7 +144,7 @@
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
rs = s.executeQuery();
long seq1 = 0;
if (rs.next()) {
@@ -152,7 +152,7 @@
}
rs.close();
s.close();
- s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
rs = s.executeQuery();
long seq2 = 0;
if (rs.next()) {
@@ -165,13 +165,13 @@
}
}
- public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
- byte[] data, long expiration) throws SQLException, IOException {
+ public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
+ long expiration) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
try {
if (s == null) {
- s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
- if (batchStatments) {
+ s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
+ if (this.batchStatments) {
c.setAddMessageStatement(s);
}
}
@@ -181,28 +181,27 @@
s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration);
setBinaryData(s, 6, data);
- if (batchStatments) {
+ if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
} finally {
- if (!batchStatments) {
- if (s!=null) {
+ if (!this.batchStatments) {
+ if (s != null) {
s.close();
}
}
}
}
- public void doAddMessageReference(TransactionContext c, MessageId messageID,
- ActiveMQDestination destination, long expirationTime, String messageRef)
- throws SQLException, IOException {
+ public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
+ long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
try {
if (s == null) {
- s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
- if (batchStatments) {
+ s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
+ if (this.batchStatments) {
c.setAddMessageStatement(s);
}
}
@@ -212,24 +211,23 @@
s.setString(4, destination.getQualifiedName());
s.setLong(5, expirationTime);
s.setString(6, messageRef);
- if (batchStatments) {
+ if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
} finally {
- if (!batchStatments) {
+ if (!this.batchStatments) {
s.close();
}
}
}
- public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
- IOException {
+ public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
s.setLong(2, messageID.getProducerSequenceId());
rs = s.executeQuery();
@@ -247,7 +245,7 @@
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();
if (!rs.next()) {
@@ -264,7 +262,7 @@
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq);
rs = s.executeQuery();
if (!rs.next()) {
@@ -281,33 +279,33 @@
PreparedStatement s = c.getRemovedMessageStatement();
try {
if (s == null) {
- s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
- if (batchStatments) {
+ s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment());
+ if (this.batchStatments) {
c.setRemovedMessageStatement(s);
}
}
s.setLong(1, seq);
- if (batchStatments) {
+ if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new SQLException("Failed to remove message");
}
} finally {
- if (!batchStatments) {
+ if (!this.batchStatments) {
s.close();
}
}
}
- public void doRecover(TransactionContext c, ActiveMQDestination destination,
- JDBCMessageRecoveryListener listener) throws Exception {
+ public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
+ throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
- if (statements.isUseExternalMessageReferences()) {
+ if (this.statements.isUseExternalMessageReferences()) {
while (rs.next()) {
if (!listener.recoverMessageReference(rs.getString(2))) {
break;
@@ -327,12 +325,12 @@
}
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, long seq) throws SQLException, IOException {
+ String subscriptionName, long seq) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
try {
if (s == null) {
- s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
- if (batchStatments) {
+ s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
+ if (this.batchStatments) {
c.setUpdateLastAckStatement(s);
}
}
@@ -340,32 +338,31 @@
s.setString(2, destination.getQualifiedName());
s.setString(3, clientId);
s.setString(4, subscriptionName);
- if (batchStatments) {
+ if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
} finally {
- if (!batchStatments) {
+ if (!this.batchStatments) {
s.close();
}
}
}
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, JDBCMessageRecoveryListener listener)
- throws Exception {
+ String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
// dumpTables(c,
// destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
rs = s.executeQuery();
- if (statements.isUseExternalMessageReferences()) {
+ if (this.statements.isUseExternalMessageReferences()) {
while (rs.next()) {
if (!listener.recoverMessageReference(rs.getString(2))) {
break;
@@ -385,12 +382,11 @@
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, long seq, int maxReturned,
- JDBCMessageRecoveryListener listener) throws Exception {
+ String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(maxReturned);
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
@@ -398,7 +394,7 @@
s.setLong(4, seq);
rs = s.executeQuery();
int count = 0;
- if (statements.isUseExternalMessageReferences()) {
+ if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
@@ -422,13 +418,12 @@
}
public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName)
- throws SQLException, IOException {
+ String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
try {
- s = c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
+ s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@@ -444,18 +439,23 @@
}
/**
- * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection,
- * java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
+ * @param c
+ * @param info
+ * @param retroactive
+ * @throws SQLException
+ * @throws IOException
+ * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
+ * org.apache.activemq.service.SubscriptionInfo)
*/
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
- throws SQLException, IOException {
+ throws SQLException, IOException {
// dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName);
PreparedStatement s = null;
try {
long lastMessageId = -1;
if (!retroactive) {
- s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
ResultSet rs = null;
try {
rs = s.executeQuery();
@@ -467,7 +467,7 @@
close(s);
}
}
- s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
+ s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
s.setString(1, info.getDestination().getQualifiedName());
s.setString(2, info.getClientId());
s.setString(3, info.getSubscriptionName());
@@ -483,12 +483,11 @@
}
public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName)
- throws SQLException, IOException {
+ String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@@ -501,8 +500,8 @@
subscription.setClientId(clientId);
subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1));
- subscription.setSubscribedDestination(ActiveMQDestination
- .createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
+ subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
+ ActiveMQDestination.QUEUE_TYPE));
return subscription;
} finally {
close(rs);
@@ -511,11 +510,11 @@
}
public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
- throws SQLException, IOException {
+ throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
@@ -525,8 +524,8 @@
subscription.setSelector(rs.getString(1));
subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
- subscription.setSubscribedDestination(ActiveMQDestination
- .createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE));
+ subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
+ ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription);
}
return rc.toArray(new SubscriptionInfo[rc.size()]);
@@ -536,15 +535,15 @@
}
}
- public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
- throws SQLException, IOException {
+ public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
+ IOException {
PreparedStatement s = null;
try {
- s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
+ s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
s.close();
- s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
+ s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
} finally {
@@ -553,10 +552,10 @@
}
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName) throws SQLException, IOException {
+ String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
try {
- s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
+ s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@@ -569,8 +568,8 @@
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
try {
- LOG.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement());
- s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
+ LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
+ s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
s.setLong(1, System.currentTimeMillis());
int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s).");
@@ -579,16 +578,13 @@
}
}
- public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
- ActiveMQDestination destination, String clientId,
- String subscriberName) throws SQLException,
- IOException {
+ public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
long result = -1;
try {
- s = c.getConnection()
- .prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
+ s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriberName);
@@ -624,7 +620,7 @@
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
+ s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
rs = s.executeQuery();
while (rs.next()) {
rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
@@ -636,34 +632,50 @@
return rc;
}
+ /**
+ * @return true if batchStements
+ */
public boolean isBatchStatments() {
- return batchStatments;
+ return this.batchStatments;
}
+ /**
+ * @param batchStatments
+ */
public void setBatchStatments(boolean batchStatments) {
this.batchStatments = batchStatments;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
- statements.setUseExternalMessageReferences(useExternalMessageReferences);
+ this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
}
+ /**
+ * @return the statements
+ */
public Statements getStatements() {
- return statements;
+ return this.statements;
}
public void setStatements(Statements statements) {
this.statements = statements;
}
- public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,
- ActiveMQDestination destination,
- String clientId, String subscriberName)
- throws SQLException, IOException {
+ /**
+ * @param c
+ * @param destination
+ * @param clientId
+ * @param subscriberName
+ * @return
+ * @throws SQLException
+ * @throws IOException
+ */
+ public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
- s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+ s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriberName);
@@ -679,12 +691,12 @@
}
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
- IOException {
+ IOException {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
try {
- s = c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
+ s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
if (rs.next()) {
@@ -698,35 +710,56 @@
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
- int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+ int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
+ long id = 0;
+ List<Long> cleanupIds = new ArrayList<Long>();
+ int index = 0;
try {
- s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
- s.setMaxRows(maxReturned);
+ s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+ s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
- s.setLong(2, nextSeq);
+ s.setLong(2, nextSeq - maxReturned);
rs = s.executeQuery();
int count = 0;
- if (statements.isUseExternalMessageReferences()) {
+ if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) {
+ id = rs.getLong(1);
+ if (this.lastRecoveredMessagesIds.contains(id)) {
+ // this message was already recovered
+ cleanupIds.add(id);
+ continue;
+ }
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
+ this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
- break;
}
}
} else {
while (rs.next() && count < maxReturned) {
+ id = rs.getLong(1);
+ if (this.lastRecoveredMessagesIds.contains(id)) {
+ // this message was already recovered
+ cleanupIds.add(id);
+ continue;
+ }
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
+ this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
- break;
}
}
}
+ // not cleanup the list of recovered messages
+ index = 0;
+ Iterator<Long> it = cleanupIds.iterator();
+ while (it.hasNext() && index < count) {
+ this.lastRecoveredMessagesIds.remove(it.next());
+ }
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -735,35 +768,26 @@
}
}
/*
- * Useful for debugging. public void dumpTables(Connection c, String
- * destinationName, String clientId, String subscriptionName) throws
- * SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
- * printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
- * PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID
- * FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND
- * D.CLIENT_ID=? AND D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID >
- * D.LAST_ACKED_ID" +" ORDER BY M.ID"); s.setString(1,destinationName);
- * s.setString(2,clientId); s.setString(3,subscriptionName);
+ * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
+ * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
+ * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
+ * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
+ * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
+ * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
* printQuery(s,System.out); }
*
- * public void dumpTables(Connection c) throws SQLException { printQuery(c,
- * "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from
- * ACTIVEMQ_ACKS", System.out); }
+ * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
+ * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
*
- * private void printQuery(Connection c, String query, PrintStream out)
- * throws SQLException { printQuery(c.prepareStatement(query), out); }
+ * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
+ * printQuery(c.prepareStatement(query), out); }
*
- * private void printQuery(PreparedStatement s, PrintStream out) throws
- * SQLException {
+ * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
*
- * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData
- * metaData = set.getMetaData(); for( int i=1; i<=
- * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||");
- * out.print(metaData.getColumnName(i)+"||"); } out.println();
- * while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) {
- * if(i==1) out.print("|"); out.print(set.getString(i)+"|"); }
- * out.println(); } } finally { try { set.close(); } catch (Throwable
- * ignore) {} try { s.close(); } catch (Throwable ignore) {} } }
+ * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
+ * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
+ * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
+ * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
+ * try { s.close(); } catch (Throwable ignore) {} } }
*/
-
}