You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [17/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jm...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Aug 8 11:56:59 2007
@@ -18,6 +18,7 @@
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
@@ -38,83 +39,82 @@
*
* @version $Revision: 1.13 $
*/
-public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{
+public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
- private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class);
+ private static final Log log = LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
- private HashMap<SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+ private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
- public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore,
- ActiveMQTopic destinationName){
- super(adapter,topicReferenceStore,destinationName);
- this.topicReferenceStore=topicReferenceStore;
+ public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
+ super(adapter, topicReferenceStore, destinationName);
+ this.topicReferenceStore = topicReferenceStore;
}
- public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
- throws Exception{
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
flush();
- topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener));
+ topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
}
- public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
- final MessageRecoveryListener listener) throws Exception{
- RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
- topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
- if(recoveryListener.size()==0){
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception {
+ RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
+ topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
+ if (recoveryListener.size() == 0) {
flush();
- topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
+ topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
}
}
- public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
- return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
}
- public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
- throws IOException{
- topicReferenceStore.addSubsciption(subscriptionInfo,retroactive);
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+ topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
}
/**
*/
- public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
- throws IOException{
- final boolean debug=log.isDebugEnabled();
- JournalTopicAck ack=new JournalTopicAck();
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+ final boolean debug = log.isDebugEnabled();
+ JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
- ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
- final Location location=peristenceAdapter.writeCommand(ack,false);
- final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
- if(!context.isInTransaction()){
- if(debug)
- log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
- acknowledge(messageId,location,key);
- }else{
- if(debug)
- log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
- synchronized(this){
+ ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
+ final Location location = peristenceAdapter.writeCommand(ack, false);
+ final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ if (!context.isInTransaction()) {
+ if (debug) {
+ log.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
+ }
+ acknowledge(messageId, location, key);
+ } else {
+ if (debug) {
+ log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
+ }
+ synchronized (this) {
inFlightTxLocations.add(location);
}
- transactionStore.acknowledge(this,ack,location);
- context.getTransaction().addSynchronization(new Synchronization(){
+ transactionStore.acknowledge(this, ack, location);
+ context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception{
- if(debug)
- log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
- synchronized(AMQTopicMessageStore.this){
+ public void afterCommit() throws Exception {
+ if (debug) {
+ log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
+ }
+ synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location);
- acknowledge(messageId,location,key);
+ acknowledge(messageId, location, key);
}
}
- public void afterRollback() throws Exception{
- if(debug)
- log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
- synchronized(AMQTopicMessageStore.this){
+ public void afterRollback() throws Exception {
+ if (debug) {
+ log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
+ }
+ synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
@@ -122,17 +122,15 @@
}
}
- public boolean replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,
- MessageId messageId){
- try{
- SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName);
- if(sub!=null){
- topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId);
+ public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+ try {
+ SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
+ if (sub != null) {
+ topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
return true;
}
- }catch(Throwable e){
- log.debug("Could not replay acknowledge for message '"+messageId
- +"'. Message may have already been acknowledged. reason: "+e);
+ } catch (Throwable e) {
+ log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
@@ -143,26 +141,27 @@
* @param key
* @throws InterruptedIOException
*/
- protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
- synchronized(this){
- lastLocation=location;
- ackedLastAckLocations.put(key,messageId);
+ protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
+ synchronized (this) {
+ lastLocation = location;
+ ackedLastAckLocations.put(key, messageId);
}
- try{
+ try {
asyncWriteTask.wakeup();
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
- @Override protected Location doAsyncWrite() throws IOException{
- final HashMap<SubscriptionKey,MessageId> cpAckedLastAckLocations;
+ @Override
+ protected Location doAsyncWrite() throws IOException {
+ final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
- synchronized(this){
- cpAckedLastAckLocations=this.ackedLastAckLocations;
- this.ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+ synchronized (this) {
+ cpAckedLastAckLocations = this.ackedLastAckLocations;
+ this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
- Location location=super.doAsyncWrite();
+ Location location = super.doAsyncWrite();
if (cpAckedLastAckLocations != null) {
transactionTemplate.run(new Callback() {
@@ -172,8 +171,7 @@
while (iterator.hasNext()) {
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
- topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
- subscriptionKey.subscriptionName, identity);
+ topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
}
}
});
@@ -184,24 +182,24 @@
/**
* @return Returns the longTermStore.
*/
- public TopicReferenceStore getTopicReferenceStore(){
+ public TopicReferenceStore getTopicReferenceStore() {
return topicReferenceStore;
}
- public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
- topicReferenceStore.deleteSubscription(clientId,subscriptionName);
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ topicReferenceStore.deleteSubscription(clientId, subscriptionName);
}
- public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return topicReferenceStore.getAllSubscriptions();
}
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ public int getMessageCount(String clientId, String subscriberName) throws IOException {
flush();
- return topicReferenceStore.getMessageCount(clientId,subscriberName);
+ return topicReferenceStore.getMessageCount(clientId, subscriberName);
}
- public void resetBatching(String clientId,String subscriptionName){
- topicReferenceStore.resetBatching(clientId,subscriptionName);
+ public void resetBatching(String clientId, String subscriptionName) {
+ topicReferenceStore.resetBatching(clientId, subscriptionName);
}
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Wed Aug 8 11:56:59 2007
@@ -21,7 +21,9 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+
import javax.transaction.xa.XAException;
+
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@@ -32,34 +34,34 @@
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
-
/**
*/
-public class AMQTransactionStore implements TransactionStore{
+public class AMQTransactionStore implements TransactionStore {
private final AMQPersistenceAdapter peristenceAdapter;
- Map<TransactionId,AMQTx> inflightTransactions=new LinkedHashMap<TransactionId,AMQTx>();
- Map<TransactionId,AMQTx> preparedTransactions=new LinkedHashMap<TransactionId,AMQTx>();
+ Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
+ Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
private boolean doingRecover;
- public AMQTransactionStore(AMQPersistenceAdapter adapter){
- this.peristenceAdapter=adapter;
+ public AMQTransactionStore(AMQPersistenceAdapter adapter) {
+ this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void prepare(TransactionId txid) throws IOException{
- AMQTx tx=null;
- synchronized(inflightTransactions){
- tx=inflightTransactions.remove(txid);
+ public void prepare(TransactionId txid) throws IOException {
+ AMQTx tx = null;
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.remove(txid);
}
- if(tx==null)
+ if (tx == null) {
return;
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
- synchronized(preparedTransactions){
- preparedTransactions.put(txid,tx);
+ }
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
+ synchronized (preparedTransactions) {
+ preparedTransactions.put(txid, tx);
}
}
@@ -67,26 +69,27 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void replayPrepare(TransactionId txid) throws IOException{
- AMQTx tx=null;
- synchronized(inflightTransactions){
- tx=inflightTransactions.remove(txid);
+ public void replayPrepare(TransactionId txid) throws IOException {
+ AMQTx tx = null;
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.remove(txid);
}
- if(tx==null)
+ if (tx == null) {
return;
- synchronized(preparedTransactions){
- preparedTransactions.put(txid,tx);
+ }
+ synchronized (preparedTransactions) {
+ preparedTransactions.put(txid, tx);
}
}
- public AMQTx getTx(TransactionId txid,Location location){
- AMQTx tx=null;
- synchronized(inflightTransactions){
- tx=inflightTransactions.get(txid);
+ public AMQTx getTx(TransactionId txid, Location location) {
+ AMQTx tx = null;
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.get(txid);
}
- if(tx==null){
- tx=new AMQTx(location);
- inflightTransactions.put(txid,tx);
+ if (tx == null) {
+ tx = new AMQTx(location);
+ inflightTransactions.put(txid, tx);
}
return tx;
}
@@ -95,24 +98,24 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+ public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
AMQTx tx;
- if(wasPrepared){
- synchronized(preparedTransactions){
- tx=preparedTransactions.remove(txid);
+ if (wasPrepared) {
+ synchronized (preparedTransactions) {
+ tx = preparedTransactions.remove(txid);
}
- }else{
- synchronized(inflightTransactions){
- tx=inflightTransactions.remove(txid);
+ } else {
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.remove(txid);
}
}
- if(tx==null)
+ if (tx == null) {
return;
- if(txid.isXATransaction()){
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
- }else{
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
- true);
+ }
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true);
}
}
@@ -120,13 +123,13 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
- if(wasPrepared){
- synchronized(preparedTransactions){
+ public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+ if (wasPrepared) {
+ synchronized (preparedTransactions) {
return preparedTransactions.remove(txid);
}
- }else{
- synchronized(inflightTransactions){
+ } else {
+ synchronized (inflightTransactions) {
return inflightTransactions.remove(txid);
}
}
@@ -136,21 +139,21 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void rollback(TransactionId txid) throws IOException{
- AMQTx tx=null;
- synchronized(inflightTransactions){
- tx=inflightTransactions.remove(txid);
- }
- if(tx!=null)
- synchronized(preparedTransactions){
- tx=preparedTransactions.remove(txid);
- }
- if(tx!=null){
- if(txid.isXATransaction()){
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
- }else{
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
- true);
+ public void rollback(TransactionId txid) throws IOException {
+ AMQTx tx = null;
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.remove(txid);
+ }
+ if (tx != null) {
+ synchronized (preparedTransactions) {
+ tx = preparedTransactions.remove(txid);
+ }
+ }
+ if (tx != null) {
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true);
}
}
}
@@ -159,42 +162,42 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void replayRollback(TransactionId txid) throws IOException{
- boolean inflight=false;
- synchronized(inflightTransactions){
- inflight=inflightTransactions.remove(txid)!=null;
+ public void replayRollback(TransactionId txid) throws IOException {
+ boolean inflight = false;
+ synchronized (inflightTransactions) {
+ inflight = inflightTransactions.remove(txid) != null;
}
- if(inflight){
- synchronized(preparedTransactions){
+ if (inflight) {
+ synchronized (preparedTransactions) {
preparedTransactions.remove(txid);
}
}
}
- public void start() throws Exception{
+ public void start() throws Exception {
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
}
- synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+ synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the in-flight transactions get rolled back..
- synchronized(inflightTransactions){
+ synchronized (inflightTransactions) {
inflightTransactions.clear();
}
- this.doingRecover=true;
- try{
- Map<TransactionId,AMQTx> txs=null;
- synchronized(preparedTransactions){
- txs=new LinkedHashMap<TransactionId,AMQTx>(preparedTransactions);
- }
- for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
- Object txid=iter.next();
- AMQTx tx=txs.get(txid);
- listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+ this.doingRecover = true;
+ try {
+ Map<TransactionId, AMQTx> txs = null;
+ synchronized (preparedTransactions) {
+ txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
+ }
+ for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
+ Object txid = iter.next();
+ AMQTx tx = txs.get(txid);
+ listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
}
- }finally{
- this.doingRecover=false;
+ } finally {
+ this.doingRecover = false;
}
}
@@ -202,69 +205,70 @@
* @param message
* @throws IOException
*/
- void addMessage(AMQMessageStore store,Message message,Location location) throws IOException{
- AMQTx tx=getTx(message.getTransactionId(),location);
- tx.add(store,message,location);
+ void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
+ AMQTx tx = getTx(message.getTransactionId(), location);
+ tx.add(store, message, location);
}
/**
* @param ack
* @throws IOException
*/
- public void removeMessage(AMQMessageStore store,MessageAck ack,Location location) throws IOException{
- AMQTx tx=getTx(ack.getTransactionId(),location);
- tx.add(store,ack);
+ public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
+ AMQTx tx = getTx(ack.getTransactionId(), location);
+ tx.add(store, ack);
}
- public void acknowledge(AMQTopicMessageStore store,JournalTopicAck ack,Location location){
- AMQTx tx=getTx(ack.getTransactionId(),location);
- tx.add(store,ack);
+ public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
+ AMQTx tx = getTx(ack.getTransactionId(), location);
+ tx.add(store, ack);
}
- public Location checkpoint() throws IOException{
+ public Location checkpoint() throws IOException {
// Nothing really to checkpoint.. since, we don't
- // checkpoint tx operations in to long term store until they are committed.
+ // checkpoint tx operations in to long term store until they are
+ // committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
- Location rc=null;
- synchronized(inflightTransactions){
- for(Iterator<AMQTx> iter=inflightTransactions.values().iterator();iter.hasNext();){
- AMQTx tx=iter.next();
- Location location=tx.getLocation();
- if(rc==null||rc.compareTo(location)<0){
- rc=location;
+ Location rc = null;
+ synchronized (inflightTransactions) {
+ for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+ AMQTx tx = iter.next();
+ Location location = tx.getLocation();
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
}
}
}
- synchronized(preparedTransactions){
- for(Iterator<AMQTx> iter=preparedTransactions.values().iterator();iter.hasNext();){
- AMQTx tx=iter.next();
- Location location=tx.getLocation();
- if(rc==null||rc.compareTo(location)<0){
- rc=location;
+ synchronized (preparedTransactions) {
+ for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+ AMQTx tx = iter.next();
+ Location location = tx.getLocation();
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
}
}
return rc;
}
}
- public boolean isDoingRecover(){
+ public boolean isDoingRecover() {
return doingRecover;
}
/**
* @return the preparedTransactions
*/
- public Map<TransactionId,AMQTx> getPreparedTransactions(){
+ public Map<TransactionId, AMQTx> getPreparedTransactions() {
return this.preparedTransactions;
}
/**
* @param preparedTransactions the preparedTransactions to set
*/
- public void setPreparedTransactions(Map<TransactionId,AMQTx> preparedTransactions){
- if(preparedTransactions!=null){
+ public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
+ if (preparedTransactions != null) {
this.preparedTransactions.clear();
this.preparedTransactions.putAll(preparedTransactions);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java Wed Aug 8 11:56:59 2007
@@ -24,56 +24,59 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.Location;
-
/**
*/
/**
* Operations
+ *
* @version $Revision: 1.6 $
*/
-public class AMQTx{
+public class AMQTx {
private final Location location;
- private ArrayList<AMQTxOperation> operations=new ArrayList<AMQTxOperation>();
+ private ArrayList<AMQTxOperation> operations = new ArrayList<AMQTxOperation>();
- public AMQTx(Location location){
- this.location=location;
+ public AMQTx(Location location) {
+ this.location = location;
}
- public void add(AMQMessageStore store,Message msg,Location location){
- operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location));
+ public void add(AMQMessageStore store, Message msg, Location location) {
+ operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE, store.getDestination(), msg,
+ location));
}
- public void add(AMQMessageStore store,MessageAck ack){
- operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null));
+ public void add(AMQMessageStore store, MessageAck ack) {
+ operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE, store.getDestination(), ack,
+ null));
}
- public void add(AMQTopicMessageStore store,JournalTopicAck ack){
- operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null));
+ public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
+ operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE, store.getDestination(), ack,
+ null));
}
- public Message[] getMessages(){
- ArrayList<Object> list=new ArrayList<Object>();
- for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
- AMQTxOperation op=iter.next();
- if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){
+ public Message[] getMessages() {
+ ArrayList<Object> list = new ArrayList<Object>();
+ for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
+ AMQTxOperation op = iter.next();
+ if (op.getOperationType() == AMQTxOperation.ADD_OPERATION_TYPE) {
list.add(op.getData());
}
}
- Message rc[]=new Message[list.size()];
+ Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
- public MessageAck[] getAcks(){
- ArrayList<Object> list=new ArrayList<Object>();
- for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
- AMQTxOperation op=iter.next();
- if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){
+ public MessageAck[] getAcks() {
+ ArrayList<Object> list = new ArrayList<Object>();
+ for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
+ AMQTxOperation op = iter.next();
+ if (op.getOperationType() == AMQTxOperation.REMOVE_OPERATION_TYPE) {
list.add(op.getData());
}
}
- MessageAck rc[]=new MessageAck[list.size()];
+ MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
@@ -81,17 +84,15 @@
/**
* @return the location
*/
- public Location getLocation(){
+ public Location getLocation() {
return this.location;
}
- public ArrayList<AMQTxOperation> getOperations(){
+ public ArrayList<AMQTxOperation> getOperations() {
return operations;
}
- public void setOperations(ArrayList<AMQTxOperation> operations){
- this.operations=operations;
+ public void setOperations(ArrayList<AMQTxOperation> operations) {
+ this.operations = operations;
}
}
-
-
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java Wed Aug 8 11:56:59 2007
@@ -29,14 +29,13 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
-
/**
*/
public class AMQTxOperation {
- public static final byte ADD_OPERATION_TYPE=0;
- public static final byte REMOVE_OPERATION_TYPE=1;
- public static final byte ACK_OPERATION_TYPE=3;
+ public static final byte ADD_OPERATION_TYPE = 0;
+ public static final byte REMOVE_OPERATION_TYPE = 1;
+ public static final byte ACK_OPERATION_TYPE = 3;
private byte operationType;
private ActiveMQDestination destination;
private Object data;
@@ -44,74 +43,73 @@
public AMQTxOperation() {
}
-
- public AMQTxOperation(byte operationType,ActiveMQDestination destination,Object data,Location location){
- this.operationType=operationType;
- this.destination=destination;
- this.data=data;
- this.location=location;
-
+
+ public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) {
+ this.operationType = operationType;
+ this.destination = destination;
+ this.data = data;
+ this.location = location;
+
}
/**
* @return the data
*/
- public Object getData(){
+ public Object getData() {
return this.data;
}
/**
* @param data the data to set
*/
- public void setData(Object data){
- this.data=data;
+ public void setData(Object data) {
+ this.data = data;
}
/**
* @return the location
*/
- public Location getLocation(){
+ public Location getLocation() {
return this.location;
}
/**
* @param location the location to set
*/
- public void setLocation(Location location){
- this.location=location;
+ public void setLocation(Location location) {
+ this.location = location;
}
/**
* @return the operationType
*/
- public byte getOperationType(){
+ public byte getOperationType() {
return this.operationType;
}
/**
* @param operationType the operationType to set
*/
- public void setOperationType(byte operationType){
- this.operationType=operationType;
+ public void setOperationType(byte operationType) {
+ this.operationType = operationType;
}
-
- public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext context) throws IOException{
- boolean result=false;
- AMQMessageStore store=(AMQMessageStore)adapter.createMessageStore(destination);
- if(operationType==ADD_OPERATION_TYPE){
- result=store.replayAddMessage(context,(Message)data,location);
- }else if(operationType==REMOVE_OPERATION_TYPE){
- result=store.replayRemoveMessage(context,(MessageAck)data);
- }else{
- JournalTopicAck ack=(JournalTopicAck)data;
- result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(),
- ack.getMessageId());
+ public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException {
+ boolean result = false;
+ AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination);
+ if (operationType == ADD_OPERATION_TYPE) {
+ result = store.replayAddMessage(context, (Message)data, location);
+ } else if (operationType == REMOVE_OPERATION_TYPE) {
+ result = store.replayRemoveMessage(context, (MessageAck)data);
+ } else {
+ JournalTopicAck ack = (JournalTopicAck)data;
+ result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack
+ .getSubscritionName(), ack.getMessageId());
}
return result;
}
-
- public void writeExternal(WireFormat wireFormat,DataOutput dos) throws IOException {
+
+ public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException {
location.writeExternal(dos);
ByteSequence packet = wireFormat.marshal(getData());
dos.writeInt(packet.length);
@@ -121,16 +119,16 @@
dos.write(packet.data, packet.offset, packet.length);
}
- public void readExternal(WireFormat wireFormat,DataInput dis) throws IOException {
- this.location=new Location();
+ public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException {
+ this.location = new Location();
this.location.readExternal(dis);
- int size=dis.readInt();
- byte[] data=new byte[size];
+ int size = dis.readInt();
+ byte[] data = new byte[size];
dis.readFully(data);
setData(wireFormat.unmarshal(new ByteSequence(data)));
- size=dis.readInt();
- data=new byte[size];
+ size = dis.readInt();
+ data = new byte[size];
dis.readFully(data);
- this.destination=(ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
+ this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Aug 8 11:56:59 2007
@@ -21,53 +21,52 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-final class RecoveryListenerAdapter implements MessageRecoveryListener{
+final class RecoveryListenerAdapter implements MessageRecoveryListener {
- static final private Log log=LogFactory.getLog(RecoveryListenerAdapter.class);
+ static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store;
private final MessageRecoveryListener listener;
- private int count=0;
+ private int count = 0;
private MessageId lastRecovered;
- RecoveryListenerAdapter(MessageStore store,MessageRecoveryListener listener){
- this.store=store;
- this.listener=listener;
+ RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {
+ this.store = store;
+ this.listener = listener;
}
-
- public boolean hasSpace(){
+ public boolean hasSpace() {
return listener.hasSpace();
}
- public boolean recoverMessage(Message message) throws Exception{
- if(listener.hasSpace()){
+ public boolean recoverMessage(Message message) throws Exception {
+ if (listener.hasSpace()) {
listener.recoverMessage(message);
- lastRecovered=message.getMessageId();
+ lastRecovered = message.getMessageId();
count++;
return true;
}
return false;
}
- public boolean recoverMessageReference(MessageId ref) throws Exception{
- Message message=this.store.getMessage(ref);
- if(message!=null){
- return recoverMessage(message);
- }else{
- log.error("Message id "+ref+" could not be recovered from the data store!");
+ public boolean recoverMessageReference(MessageId ref) throws Exception {
+ Message message = this.store.getMessage(ref);
+ if (message != null) {
+ return recoverMessage(message);
+ } else {
+ log.error("Message id " + ref + " could not be recovered from the data store!");
}
return false;
}
-
+
MessageId getLastRecoveredMessageId() {
return lastRecovered;
}
- int size(){
+ int size() {
return count;
}
- void reset(){
- count=0;
+ void reset() {
+ count = 0;
}
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java Wed Aug 8 11:56:59 2007
@@ -65,7 +65,7 @@
public DataSource getDataSource() throws IOException {
if (dataSource == null) {
dataSource = createDataSource();
- if (dataSource == null) {
+ if (dataSource == null) {
throw new IllegalArgumentException("No dataSource property has been configured");
}
}
@@ -88,9 +88,9 @@
ds.setCreateDatabase("create");
return ds;
}
-
- public String toString(){
- return ""+dataSource;
+
+ public String toString() {
+ return "" + dataSource;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Wed Aug 8 11:56:59 2007
@@ -24,69 +24,87 @@
/**
* @version $Revision: 1.5 $
*/
-public interface JDBCAdapter{
+public interface JDBCAdapter {
public void setStatements(Statements statementProvider);
- public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException;
+ public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
- public abstract void doDropTables(TransactionContext c) throws SQLException,IOException;
+ public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
- public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
- byte[] data,long expiration) throws SQLException,IOException;
+ public abstract void doAddMessage(TransactionContext c, MessageId messageID,
+ ActiveMQDestination destination, byte[] data, long expiration)
+ throws SQLException, IOException;
- public abstract void doAddMessageReference(TransactionContext c,MessageId messageId,
- ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException;
+ public abstract void doAddMessageReference(TransactionContext c, MessageId messageId,
+ ActiveMQDestination destination, long expirationTime,
+ String messageRef) throws SQLException, IOException;
- public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException;
+ public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
- public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException;
+ public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException,
+ IOException;
- public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException;
+ public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
- public abstract void doRecover(TransactionContext c,ActiveMQDestination destination,
- JDBCMessageRecoveryListener listener) throws Exception;
+ public abstract void doRecover(TransactionContext c, ActiveMQDestination destination,
+ JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,long seq) throws SQLException,IOException;
+ public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, long seq) throws SQLException, IOException;
- public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception;
+ public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriptionName,
+ JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
+ public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriptionName, long seq,
+ int maxReturned, JDBCMessageRecoveryListener listener)
+ throws Exception;
- public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException;
+ public abstract void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo,
+ boolean retroactive) throws SQLException, IOException;
- public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriptionName) throws SQLException,IOException;
+ public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,
+ ActiveMQDestination destination, String clientId,
+ String subscriptionName) throws SQLException,
+ IOException;
- public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException;
+ public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
+ IOException;
- public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName)
- throws SQLException,IOException;
+ public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
+ throws SQLException, IOException;
- public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId,
- String subscriptionName) throws SQLException,IOException;
+ public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName,
+ String clientId, String subscriptionName) throws SQLException,
+ IOException;
- public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException;
+ public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
- public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException;
+ public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,
+ IOException;
- public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException;
+ public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
- public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
- throws SQLException,IOException;
-
- public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName) throws SQLException,IOException;
-
- public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
-
- public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
- JDBCMessageRecoveryListener listener) throws Exception;
-
- public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException;
-}
\ No newline at end of file
+ public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,
+ ActiveMQDestination destination)
+ throws SQLException, IOException;
+
+ public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriptionName)
+ throws SQLException, IOException;
+
+ public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
+ IOException;
+
+ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
+ int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
+
+ public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
+ ActiveMQDestination destination, String clientId,
+ String subscriberName) throws SQLException,
+ IOException;
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Wed Aug 8 11:56:59 2007
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -32,7 +33,6 @@
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
-
/**
* @version $Revision: 1.10 $
*/
@@ -44,8 +44,7 @@
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
- public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
- ActiveMQDestination destination) {
+ public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
@@ -53,15 +52,14 @@
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
-
+
// Serialize the Message..
byte data[];
try {
ByteSequence packet = wireFormat.marshal(message);
data = ByteSequenceData.toByteArray(packet);
} catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
- + e, e);
+ throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
}
// Get a connection and insert the message into the DB.
@@ -69,9 +67,8 @@
try {
adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration());
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
- + e, e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
} finally {
c.close();
}
@@ -83,9 +80,8 @@
try {
adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: "
- + e, e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} finally {
c.close();
}
@@ -94,7 +90,7 @@
public Message getMessage(MessageId messageId) throws IOException {
long id = messageId.getBrokerSequenceId();
-
+
// Get a connection and pull the message out of the DB
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -102,21 +98,21 @@
if (data == null)
return null;
- Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
return answer;
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} finally {
c.close();
}
}
-
+
public String getMessageReference(MessageId messageId) throws IOException {
long id = messageId.getBrokerSequenceId();
-
+
// Get a connection and pull the message out of the DB
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -124,7 +120,7 @@
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} finally {
c.close();
@@ -139,7 +135,7 @@
try {
adapter.doRemoveMessage(c, seq);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
} finally {
c.close();
@@ -154,16 +150,17 @@
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
return listener.recoverMessage(msg);
}
+
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
}
});
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
} finally {
c.close();
@@ -185,13 +182,13 @@
try {
adapter.doRemoveAllMessages(c, destination);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
} finally {
c.close();
}
}
-
+
public ActiveMQDestination getDestination() {
return destination;
}
@@ -200,16 +197,15 @@
// we can ignore since we don't buffer up messages.
}
-
- public int getMessageCount() throws IOException{
+ public int getMessageCount() throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
-
+
result = adapter.doGetMessageCount(c, destination);
-
+
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
} finally {
c.close();
@@ -221,50 +217,49 @@
* @param maxReturned
* @param listener
* @throws Exception
- * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
+ * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
+ * org.apache.activemq.store.MessageRecoveryListener)
*/
- public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception{
- TransactionContext c=persistenceAdapter.getTransactionContext();
-
- try{
- adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
- new JDBCMessageRecoveryListener(){
-
- public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
- if(listener.hasSpace()){
- Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
- lastMessageId.set(sequenceId);
- return true;
- }
- return false;
- }
-
- public boolean recoverMessageReference(String reference) throws Exception{
- if(listener.hasSpace()) {
- listener.recoverMessageReference(new MessageId(reference));
- return true;
- }
- return false;
- }
-
- });
- }catch(SQLException e){
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- }finally{
+ public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+
+ try {
+ adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned, new JDBCMessageRecoveryListener() {
+
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+ if (listener.hasSpace()) {
+ Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ lastMessageId.set(sequenceId);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean recoverMessageReference(String reference) throws Exception {
+ if (listener.hasSpace()) {
+ listener.recoverMessageReference(new MessageId(reference));
+ return true;
+ }
+ return false;
+ }
+
+ });
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ } finally {
c.close();
}
-
+
}
/**
- *
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
- public void resetBatching(){
+ public void resetBatching() {
lastMessageId.set(-1);
-
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Aug 8 11:56:59 2007
@@ -60,10 +60,12 @@
*
* @version $Revision: 1.9 $
*/
-public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, BrokerServiceAware {
+public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
+ BrokerServiceAware {
private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
- private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
+ private static FactoryFinder factoryFinder = new FactoryFinder(
+ "META-INF/services/org/apache/activemq/store/jdbc/");
private WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
@@ -93,20 +95,16 @@
try {
c = getTransactionContext();
return getAdapter().doGetDestinations(c);
- }
- catch (IOException e) {
+ } catch (IOException e) {
return Collections.EMPTY_SET;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
return Collections.EMPTY_SET;
- }
- finally {
+ } finally {
if (c != null) {
try {
c.close();
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
}
}
}
@@ -141,7 +139,7 @@
try {
return getAdapter().doGetLastMessageBrokerSequenceId(c);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
} finally {
c.close();
@@ -159,19 +157,18 @@
getAdapter().doCreateTables(transactionContext);
} catch (SQLException e) {
log.warn("Cannot create tables due to: " + e);
- JDBCPersistenceAdapter.log("Failure Details: ",e);
+ JDBCPersistenceAdapter.log("Failure Details: ", e);
}
} finally {
transactionContext.commit();
}
}
-
+
if (isUseDatabaseLock()) {
DatabaseLocker service = getDatabaseLocker();
if (service == null) {
log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
- }
- else {
+ } else {
service.start();
}
}
@@ -209,20 +206,16 @@
log.debug("Cleaning up old messages.");
c = getTransactionContext();
getAdapter().doDeleteOldMessages(c);
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.warn("Old message cleanup failed due to: " + e, e);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
log.warn("Old message cleanup failed due to: " + e);
JDBCPersistenceAdapter.log("Failure Details: ", e);
- }
- finally {
+ } finally {
if (c != null) {
try {
c.close();
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
}
}
log.debug("Cleanup done.");
@@ -253,7 +246,6 @@
return adapter;
}
-
public DatabaseLocker getDatabaseLocker() throws IOException {
if (databaseLocker == null) {
databaseLocker = createDatabaseLocker();
@@ -274,7 +266,7 @@
public void setDatabaseLocker(DatabaseLocker databaseLocker) {
this.databaseLocker = databaseLocker;
}
-
+
public BrokerService getBrokerService() {
return brokerService;
}
@@ -287,7 +279,7 @@
* @throws IOException
*/
protected JDBCAdapter createAdapter() throws IOException {
- JDBCAdapter adapter=null;
+ JDBCAdapter adapter = null;
TransactionContext c = getTransactionContext();
try {
@@ -298,17 +290,18 @@
dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
try {
- adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
+ adapter = (DefaultJDBCAdapter)factoryFinder.newInstance(dirverName);
log.info("Database driver recognized: [" + dirverName + "]");
} catch (Throwable e) {
log.warn("Database driver NOT recognized: [" + dirverName
- + "]. Will use default JDBC implementation.");
+ + "]. Will use default JDBC implementation.");
}
} catch (SQLException e) {
- log.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
- + e.getMessage());
- JDBCPersistenceAdapter.log("Failure Details: ",e);
+ log
+ .warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
+ + e.getMessage());
+ JDBCPersistenceAdapter.log("Failure Details: ", e);
}
// Use the default JDBC adapter if the
@@ -340,7 +333,7 @@
if (context == null) {
return getTransactionContext();
} else {
- TransactionContext answer = (TransactionContext) context.getLongTermStoreContext();
+ TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
if (answer == null) {
answer = new TransactionContext(getDataSource());
context.setLongTermStoreContext(answer);
@@ -373,7 +366,8 @@
}
/**
- * Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
+ * Sets the number of milliseconds until the database is attempted to be
+ * cleaned up for durable topics
*/
public void setCleanupPeriod(int cleanupPeriod) {
this.cleanupPeriod = cleanupPeriod;
@@ -386,7 +380,7 @@
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
getAdapter().doCreateTables(c);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create(e);
} finally {
c.close();
@@ -400,7 +394,7 @@
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences = useExternalMessageReferences;
}
-
+
public boolean isCreateTablesOnStartup() {
return createTablesOnStartup;
}
@@ -417,23 +411,24 @@
}
/**
- * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+ * Sets whether or not an exclusive database lock should be used to enable
+ * JDBC Master/Slave. Enabled by default.
*/
public void setUseDatabaseLock(boolean useDatabaseLock) {
this.useDatabaseLock = useDatabaseLock;
}
static public void log(String msg, SQLException e) {
- String s = msg+e.getMessage();
- while( e.getNextException() != null ) {
+ String s = msg + e.getMessage();
+ while (e.getNextException() != null) {
e = e.getNextException();
- s += ", due to: "+e.getMessage();
+ s += ", due to: " + e.getMessage();
}
log.debug(s, e);
}
public Statements getStatements() {
- if( statements == null ) {
+ if (statements == null) {
statements = new Statements();
}
return statements;
@@ -444,12 +439,12 @@
}
/**
- * @param usageManager The UsageManager that is controlling the destination's memory usage.
+ * @param usageManager The UsageManager that is controlling the
+ * destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
-
protected void databaseLockKeepAlive() {
boolean stop = false;
try {
@@ -459,8 +454,7 @@
stop = true;
}
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.error("Failed to get database when trying keepalive: " + e, e);
}
if (stop) {
@@ -473,8 +467,7 @@
log.info("No longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.warn("Failed to stop broker");
}
}
@@ -482,17 +475,17 @@
protected DatabaseLocker createDatabaseLocker() throws IOException {
return new DefaultDatabaseLocker(getDataSource(), getStatements());
}
-
- public void setBrokerName(String brokerName){
+
+ public void setBrokerName(String brokerName) {
}
-
- public String toString(){
- return "JDBCPersistenceAdaptor("+super.toString()+")";
+
+ public String toString() {
+ return "JDBCPersistenceAdaptor(" + super.toString() + ")";
}
- public void setDirectory(File dir){
+ public void setDirectory(File dir) {
}
- public void checkpoint(boolean sync) throws IOException{
+ public void checkpoint(boolean sync) throws IOException {
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Wed Aug 8 11:56:59 2007
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@@ -32,29 +33,26 @@
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
-
/**
* @version $Revision: 1.6 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
- private Map subscriberLastMessageMap=new ConcurrentHashMap();
- public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
- ActiveMQTopic topic) {
+ private Map subscriberLastMessageMap = new ConcurrentHashMap();
+
+ public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic) {
super(persistenceAdapter, adapter, wireFormat, topic);
}
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
- throws IOException {
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
long seq = messageId.getBrokerSequenceId();
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message "
- + messageId + " in container: " + e, e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
} finally {
c.close();
}
@@ -62,91 +60,86 @@
/**
* @throws Exception
- *
*/
- public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
- throws Exception {
+ public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
- adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
- new JDBCMessageRecoveryListener() {
- public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- return listener.recoverMessage(msg);
- }
- public boolean recoverMessageReference(String reference) throws Exception {
- return listener.recoverMessageReference(new MessageId(reference));
- }
-
- });
+ adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+ Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ return listener.recoverMessage(msg);
+ }
+
+ public boolean recoverMessageReference(String reference) throws Exception {
+ return listener.recoverMessageReference(new MessageId(reference));
+ }
+
+ });
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
}
- public synchronized void recoverNextMessages(final String clientId,final String subscriptionName,
- final int maxReturned,final MessageRecoveryListener listener) throws Exception{
- TransactionContext c=persistenceAdapter.getTransactionContext();
- String subcriberId=getSubscriptionKey(clientId,subscriptionName);
- AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
- if(last==null){
- long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName);
- last=new AtomicLong(lastAcked);
- subscriberLastMessageMap.put(subcriberId,last);
- }
- final AtomicLong finalLast=last;
- try{
- adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
- new JDBCMessageRecoveryListener(){
-
- public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
- if(listener.hasSpace()){
- Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
- finalLast.set(sequenceId);
- return true;
- }
- return false;
- }
-
- public boolean recoverMessageReference(String reference) throws Exception{
- return listener.recoverMessageReference(new MessageId(reference));
- }
-
- });
- }catch(SQLException e){
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- }finally{
+ public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
+ throws Exception {
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ String subcriberId = getSubscriptionKey(clientId, subscriptionName);
+ AtomicLong last = (AtomicLong)subscriberLastMessageMap.get(subcriberId);
+ if (last == null) {
+ long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
+ last = new AtomicLong(lastAcked);
+ subscriberLastMessageMap.put(subcriberId, last);
+ }
+ final AtomicLong finalLast = last;
+ try {
+ adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() {
+
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+ if (listener.hasSpace()) {
+ Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ finalLast.set(sequenceId);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean recoverMessageReference(String reference) throws Exception {
+ return listener.recoverMessageReference(new MessageId(reference));
+ }
+
+ });
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ } finally {
c.close();
last.set(finalLast.get());
}
}
-
- public void resetBatching(String clientId,String subscriptionName) {
- String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+
+ public void resetBatching(String clientId, String subscriptionName) {
+ String subcriberId = getSubscriptionKey(clientId, subscriptionName);
subscriberLastMessageMap.remove(subcriberId);
}
-
+
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
*/
- public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
- throws IOException {
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
c = persistenceAdapter.getTransactionContext();
adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport
- .create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
} finally {
c.close();
}
@@ -161,7 +154,7 @@
try {
return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
@@ -173,11 +166,11 @@
try {
adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
- resetBatching(clientId,subscriptionName);
+ resetBatching(clientId, subscriptionName);
}
}
@@ -186,40 +179,32 @@
try {
return adapter.doGetAllSubscriptions(c, destination);
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
} finally {
c.close();
}
}
-
-
-
-
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ public int getMessageCount(String clientId, String subscriberName) throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
-
+
} catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
-
- protected String getSubscriptionKey(String clientId,String subscriberName){
- String result=clientId+":";
- result+=subscriberName!=null?subscriberName:"NOT_SET";
+
+ protected String getSubscriptionKey(String clientId, String subscriberName) {
+ String result = clientId + ":";
+ result += subscriberName != null ? subscriberName : "NOT_SET";
return result;
}
-
-
-
-
}