You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [19/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jm...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Aug 8 11:56:59 2007
@@ -60,62 +60,63 @@
/** A MessageStore that we can use to retrieve messages quickly. */
private LinkedHashMap cpAddedMessageIds;
-
+
protected RecordLocation lastLocation;
protected HashSet inFlightTxLocations = new HashSet();
private UsageManager usageManager;
-
- public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
+
+ public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore,
+ ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
}
-
+
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermStore.setUsageManager(usageManager);
}
-
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(ConnectionContext context, final Message message) throws IOException {
-
+
final MessageId id = message.getMessageId();
-
+
final boolean debug = log.isDebugEnabled();
message.incrementReferenceCount();
-
+
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
- if( !context.isInTransaction() ) {
- if( debug )
- log.debug("Journalled message add for: "+id+", at: "+location);
+ if (!context.isInTransaction()) {
+ if (debug)
+ log.debug("Journalled message add for: " + id + ", at: " + location);
addMessage(message, location);
} else {
- if( debug )
- log.debug("Journalled transacted message add for: "+id+", at: "+location);
- synchronized( this ) {
+ if (debug)
+ log.debug("Journalled transacted message add for: " + id + ", at: " + location);
+ synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
- context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Exception {
- if( debug )
- log.debug("Transacted message add commit for: "+id+", at: "+location);
- synchronized( JournalMessageStore.this ) {
+ context.getTransaction().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ if (debug)
+ log.debug("Transacted message add commit for: " + id + ", at: " + location);
+ synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
addMessage(message, location);
}
}
- public void afterRollback() throws Exception {
- if( debug )
- log.debug("Transacted message add rollback for: "+id+", at: "+location);
- synchronized( JournalMessageStore.this ) {
+
+ public void afterRollback() throws Exception {
+ if (debug)
+ log.debug("Transacted message add rollback for: " + id + ", at: " + location);
+ synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
message.decrementReferenceCount();
@@ -131,17 +132,17 @@
messages.put(id, message);
}
}
-
+
public void replayAddMessage(ConnectionContext context, Message message) {
try {
// Only add the message if it has not already been added.
Message t = longTermStore.getMessage(message.getMessageId());
- if( t==null ) {
+ if (t == null) {
longTermStore.addMessage(context, message);
}
- }
- catch (Throwable e) {
- log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
+ } catch (Throwable e) {
+ log.warn("Could not replay add for message '" + message.getMessageId()
+ + "'. Message may have already been added. reason: " + e);
}
}
@@ -152,32 +153,36 @@
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
-
+
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
- if( !context.isInTransaction() ) {
- if( debug )
- log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+ if (!context.isInTransaction()) {
+ if (debug)
+ log.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
removeMessage(ack, location);
} else {
- if( debug )
- log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
- synchronized( this ) {
+ if (debug)
+ log.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: "
+ + location);
+ synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
- context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Exception {
- if( debug )
- log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
- synchronized( JournalMessageStore.this ) {
+ context.getTransaction().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ if (debug)
+ log.debug("Transacted message remove commit for: " + ack.getLastMessageId()
+ + ", at: " + location);
+ synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
- public void afterRollback() throws Exception {
- if( debug )
- log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
- synchronized( JournalMessageStore.this ) {
+
+ public void afterRollback() throws Exception {
+ if (debug)
+ log.debug("Transacted message remove rollback for: " + ack.getLastMessageId()
+ + ", at: " + location);
+ synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
@@ -185,12 +190,12 @@
}
}
-
+
final void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
- Message message = (Message) messages.remove(id);
+ Message message = (Message)messages.remove(id);
if (message == null) {
messageAcks.add(ack);
} else {
@@ -198,17 +203,17 @@
}
}
}
-
+
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
Message t = longTermStore.getMessage(messageAck.getLastMessageId());
- if( t!=null ) {
+ if (t != null) {
longTermStore.removeMessage(context, messageAck);
}
- }
- catch (Throwable e) {
- log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
+ } catch (Throwable e) {
+ log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+ + "'. Message may have already been acknowledged. reason: " + e);
}
}
@@ -219,14 +224,13 @@
public RecordLocation checkpoint() throws IOException {
return checkpoint(null);
}
-
+
/**
* @return
* @throws IOException
*/
public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
-
RecordLocation rc;
final ArrayList cpRemovedMessageLocations;
final ArrayList cpActiveJournalLocations;
@@ -237,37 +241,37 @@
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
- cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
-
+ cpActiveJournalLocations = new ArrayList(inFlightTxLocations);
+
this.messages = new LinkedHashMap();
- this.messageAcks = new ArrayList();
+ this.messageAcks = new ArrayList();
}
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
-
+
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
ConnectionContext context = transactionTemplate.getContext();
-
+
// Checkpoint the added messages.
- synchronized(JournalMessageStore.this){
- Iterator iterator=cpAddedMessageIds.values().iterator();
- while(iterator.hasNext()){
- Message message=(Message)iterator.next();
- try{
- longTermStore.addMessage(context,message);
- }catch(Throwable e){
- log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+ synchronized (JournalMessageStore.this) {
+ Iterator iterator = cpAddedMessageIds.values().iterator();
+ while (iterator.hasNext()) {
+ Message message = (Message)iterator.next();
+ try {
+ longTermStore.addMessage(context, message);
+ } catch (Throwable e) {
+ log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
- size+=message.getSize();
+ size += message.getSize();
message.decrementReferenceCount();
// Commit the batch if it's getting too big
- if(size>=maxCheckpointMessageAddSize){
+ if (size >= maxCheckpointMessageAddSize) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
- size=0;
+ size = 0;
}
}
}
@@ -279,14 +283,14 @@
Iterator iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) {
try {
- MessageAck ack = (MessageAck) iterator.next();
+ MessageAck ack = (MessageAck)iterator.next();
longTermStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
-
- if( postCheckpointTest!= null ) {
+
+ if (postCheckpointTest != null) {
postCheckpointTest.execute();
}
}
@@ -296,12 +300,12 @@
synchronized (this) {
cpAddedMessageIds = null;
}
-
- if( cpActiveJournalLocations.size() > 0 ) {
+
+ if (cpActiveJournalLocations.size() > 0) {
Collections.sort(cpActiveJournalLocations);
- return (RecordLocation) cpActiveJournalLocations.get(0);
+ return (RecordLocation)cpActiveJournalLocations.get(0);
}
- synchronized (this){
+ synchronized (this) {
return lastLocation;
}
}
@@ -314,15 +318,15 @@
synchronized (this) {
// Do we have a still have it in the journal?
- answer = (Message) messages.get(identity);
- if( answer==null && cpAddedMessageIds!=null )
- answer = (Message) cpAddedMessageIds.get(identity);
+ answer = (Message)messages.get(identity);
+ if (answer == null && cpAddedMessageIds != null)
+ answer = (Message)cpAddedMessageIds.get(identity);
}
-
- if (answer != null ) {
+
+ if (answer != null) {
return answer;
}
-
+
// If all else fails try the long term message store.
return longTermStore.getMessage(identity);
}
@@ -333,7 +337,7 @@
* updated.
*
* @param listener
- * @throws Exception
+ * @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
@@ -341,14 +345,14 @@
}
public void start() throws Exception {
- if( this.usageManager != null )
+ if (this.usageManager != null)
this.usageManager.addUsageListener(peristenceAdapter);
longTermStore.start();
}
public void stop() throws Exception {
longTermStore.stop();
- if( this.usageManager != null )
+ if (this.usageManager != null)
this.usageManager.removeUsageListener(peristenceAdapter);
}
@@ -366,12 +370,13 @@
peristenceAdapter.checkpoint(true, true);
longTermStore.removeAllMessages(context);
}
-
+
public ActiveMQDestination getDestination() {
return destination;
}
- public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
+ String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
@@ -381,25 +386,23 @@
/**
* @return
- * @throws IOException
+ * @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
- public int getMessageCount() throws IOException{
+ public int getMessageCount() throws IOException {
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount();
}
-
- public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
- longTermStore.recoverNextMessages(maxReturned,listener);
-
+ longTermStore.recoverNextMessages(maxReturned, listener);
+
}
-
- public void resetBatching(){
+ public void resetBatching() {
longTermStore.resetBatching();
-
+
}
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Aug 8 11:56:59 2007
@@ -75,7 +75,6 @@
* other long term persistent storage.
*
* @org.apache.xbean.XBean
- *
* @version $Revision: 1.17 $
*/
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
@@ -89,45 +88,45 @@
private final ConcurrentHashMap queues = new ConcurrentHashMap();
private final ConcurrentHashMap topics = new ConcurrentHashMap();
-
+
private UsageManager usageManager;
long checkpointInterval = 1000 * 60 * 5;
long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
- private int maxCheckpointMessageAddSize = 1024*1024;
+ private int maxCheckpointMessageAddSize = 1024 * 1024;
private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
-
+
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private boolean fullCheckPoint;
-
+
private AtomicBoolean started = new AtomicBoolean(false);
- private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
-
+ private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
+
final Runnable createPeriodicCheckpointTask() {
- return new Runnable() {
- public void run() {
+ return new Runnable() {
+ public void run() {
long lastTime = 0;
- synchronized(this) {
+ synchronized (this) {
lastTime = lastCheckpointRequest;
}
- if( System.currentTimeMillis()>lastTime+checkpointInterval ) {
- checkpoint(false, true);
- }
- }
- };
+ if (System.currentTimeMillis() > lastTime + checkpointInterval) {
+ checkpoint(false, true);
+ }
+ }
+ };
}
-
+
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
this.journal = journal;
journal.setJournalEventListener(this);
-
- checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+
+ checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() {
return doCheckpoint();
}
@@ -137,7 +136,8 @@
}
/**
- * @param usageManager The UsageManager that is controlling the destination's memory usage.
+ * @param usageManager The UsageManager that is controlling the
+ * destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
@@ -153,15 +153,14 @@
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
- return createQueueMessageStore((ActiveMQQueue) destination);
- }
- else {
- return createTopicMessageStore((ActiveMQTopic) destination);
+ return createQueueMessageStore((ActiveMQQueue)destination);
+ } else {
+ return createTopicMessageStore((ActiveMQTopic)destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- JournalMessageStore store = (JournalMessageStore) queues.get(destination);
+ JournalMessageStore store = (JournalMessageStore)queues.get(destination);
if (store == null) {
MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
store = new JournalMessageStore(this, checkpointStore, destination);
@@ -171,7 +170,7 @@
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
- JournalTopicMessageStore store = (JournalTopicMessageStore) topics.get(destinationName);
+ JournalTopicMessageStore store = (JournalTopicMessageStore)topics.get(destinationName);
if (store == null) {
TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
@@ -201,24 +200,25 @@
}
public synchronized void start() throws Exception {
- if( !started.compareAndSet(false, true) )
+ if (!started.compareAndSet(false, true)) {
return;
-
+ }
+
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
return t;
- }
+ }
});
- //checkpointExecutor.allowCoreThreadTimeOut(true);
-
+ // checkpointExecutor.allowCoreThreadTimeOut(true);
+
this.usageManager.addUsageListener(this);
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// Disabled periodic clean up as it deadlocks with the checkpoint
// operations.
- ((JDBCPersistenceAdapter) longTermPersistence).setCleanupPeriod(0);
+ ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
}
longTermPersistence.start();
@@ -226,23 +226,23 @@
recover();
// Do a checkpoint periodically.
- Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
+ Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
}
public void stop() throws Exception {
-
+
this.usageManager.removeUsageListener(this);
- if( !started.compareAndSet(true, false) )
+ if (!started.compareAndSet(true, false))
return;
-
+
Scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
- checkpointTask.shutdown();
+ checkpointTask.shutdown();
checkpointExecutor.shutdown();
-
+
queues.clear();
topics.clear();
@@ -253,7 +253,7 @@
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
longTermPersistence.stop();
-
+
if (firstException != null) {
throw firstException;
}
@@ -287,83 +287,86 @@
/**
* When we checkpoint we move all the journalled data to long term storage.
- * @param stopping
*
+ * @param stopping
* @param b
*/
public void checkpoint(boolean sync, boolean fullCheckpoint) {
try {
- if (journal == null )
+ if (journal == null)
throw new IllegalStateException("Journal is closed.");
-
+
long now = System.currentTimeMillis();
CountDownLatch latch = null;
- synchronized(this) {
+ synchronized (this) {
latch = nextCheckpointCountDownLatch;
lastCheckpointRequest = now;
- if( fullCheckpoint ) {
- this.fullCheckPoint = true;
+ if (fullCheckpoint) {
+ this.fullCheckPoint = true;
}
}
-
+
checkpointTask.wakeup();
-
+
if (sync) {
log.debug("Waking for checkpoint to complete.");
latch.await();
}
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e);
}
}
-
+
public void checkpoint(boolean sync) {
- checkpoint(sync,sync);
+ checkpoint(sync, sync);
}
-
+
/**
* This does the actual checkpoint.
- * @return
+ *
+ * @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
boolean fullCheckpoint;
- synchronized(this) {
+ synchronized (this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
fullCheckpoint = this.fullCheckPoint;
- this.fullCheckPoint=false;
- }
+ this.fullCheckPoint = false;
+ }
try {
log.debug("Checkpoint started.");
RecordLocation newMark = null;
- ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
-
+ ArrayList futureTasks = new ArrayList(queues.size() + topics.size());
+
//
- // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
- // to long term store as soon as possible.
+ // We do many partial checkpoints (fullCheckpoint==false) to move
+ // topic messages
+ // to long term store as soon as possible.
//
- // We want to avoid doing that for queue messages since removes the come in the same
- // checkpoint cycle will nullify the previous message add. Therefore, we only
+ // We want to avoid doing that for queue messages since removes the
+ // come in the same
+ // checkpoint cycle will nullify the previous message add.
+ // Therefore, we only
// checkpoint queues on the fullCheckpoint cycles.
//
- if( fullCheckpoint ) {
+ if (fullCheckpoint) {
Iterator iterator = queues.values().iterator();
while (iterator.hasNext()) {
try {
- final JournalMessageStore ms = (JournalMessageStore) iterator.next();
+ final JournalMessageStore ms = (JournalMessageStore)iterator.next();
FutureTask task = new FutureTask(new Callable() {
public Object call() throws Exception {
return ms.checkpoint();
- }});
+ }
+ });
futureTasks.add(task);
- checkpointExecutor.execute(task);
- }
- catch (Exception e) {
+ checkpointExecutor.execute(task);
+ } catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
}
@@ -372,25 +375,25 @@
Iterator iterator = topics.values().iterator();
while (iterator.hasNext()) {
try {
- final JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
+ final JournalTopicMessageStore ms = (JournalTopicMessageStore)iterator.next();
FutureTask task = new FutureTask(new Callable() {
public Object call() throws Exception {
return ms.checkpoint();
- }});
+ }
+ });
futureTasks.add(task);
- checkpointExecutor.execute(task);
- }
- catch (Exception e) {
+ checkpointExecutor.execute(task);
+ } catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
}
try {
for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
- FutureTask ft = (FutureTask) iter.next();
- RecordLocation mark = (RecordLocation) ft.get();
+ FutureTask ft = (FutureTask)iter.next();
+ RecordLocation mark = (RecordLocation)ft.get();
// We only set a newMark on full checkpoints.
- if( fullCheckpoint ) {
+ if (fullCheckpoint) {
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
@@ -399,38 +402,36 @@
} catch (Throwable e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
-
- if( fullCheckpoint ) {
+ if (fullCheckpoint) {
try {
if (newMark != null) {
log.debug("Marking journal at: " + newMark);
journal.setMark(newMark, true);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("Failed to mark the Journal: " + e, e);
}
-
+
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
- // We may be check pointing more often than the checkpointInterval if under high use
+ // We may be check pointing more often than the
+ // checkpointInterval if under high use
// But we don't want to clean up the db that often.
long now = System.currentTimeMillis();
- if( now > lastCleanup+checkpointInterval ) {
+ if (now > lastCleanup + checkpointInterval) {
lastCleanup = now;
- ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
+ ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
}
}
}
log.debug("Checkpoint done.");
- }
- finally {
+ } finally {
latch.countDown();
}
- synchronized(this) {
+ synchronized (this) {
return this.fullCheckPoint;
- }
+ }
}
@@ -441,13 +442,11 @@
*/
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
- Packet packet = journal.read(location);
- return (DataStructure) wireFormat.unmarshal(toByteSequence(packet));
- }
- catch (InvalidRecordLocationException e) {
+ Packet packet = journal.read(location);
+ return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
+ } catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw createReadException(location, e);
}
}
@@ -472,49 +471,43 @@
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
- DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
+ DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
- if (c instanceof Message ) {
- Message message = (Message) c;
- JournalMessageStore store = (JournalMessageStore) createMessageStore(message.getDestination());
- if ( message.isInTransaction()) {
+ if (c instanceof Message) {
+ Message message = (Message)c;
+ JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
+ if (message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
- }
- else {
+ } else {
store.replayAddMessage(context, message);
transactionCounter++;
}
} else {
switch (c.getDataStructureType()) {
- case JournalQueueAck.DATA_STRUCTURE_TYPE:
- {
- JournalQueueAck command = (JournalQueueAck) c;
- JournalMessageStore store = (JournalMessageStore) createMessageStore(command.getDestination());
+ case JournalQueueAck.DATA_STRUCTURE_TYPE: {
+ JournalQueueAck command = (JournalQueueAck)c;
+ JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
- }
- else {
+ } else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++;
}
}
- break;
- case JournalTopicAck.DATA_STRUCTURE_TYPE:
- {
- JournalTopicAck command = (JournalTopicAck) c;
- JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(command.getDestination());
+ break;
+ case JournalTopicAck.DATA_STRUCTURE_TYPE: {
+ JournalTopicAck command = (JournalTopicAck)c;
+ JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
- }
- else {
+ } else {
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++;
}
}
- break;
- case JournalTransaction.DATA_STRUCTURE_TYPE:
- {
- JournalTransaction command = (JournalTransaction) c;
+ break;
+ case JournalTransaction.DATA_STRUCTURE_TYPE: {
+ JournalTransaction command = (JournalTransaction)c;
try {
// Try to replay the packet.
switch (command.getType()) {
@@ -525,23 +518,23 @@
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null)
- break; // We may be trying to replay a commit that
- // was already committed.
+ break; // We may be trying to replay a commit
+ // that
+ // was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
- TxOperation op = (TxOperation) iter.next();
+ TxOperation op = (TxOperation)iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
- op.store.replayAddMessage(context, (Message) op.data);
+ op.store.replayAddMessage(context, (Message)op.data);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
- op.store.replayRemoveMessage(context, (MessageAck) op.data);
+ op.store.replayRemoveMessage(context, (MessageAck)op.data);
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
- JournalTopicAck ack = (JournalTopicAck) op.data;
- ((JournalTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
- .getMessageId());
+ JournalTopicAck ack = (JournalTopicAck)op.data;
+ ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
}
}
transactionCounter++;
@@ -551,14 +544,13 @@
transactionStore.replayRollback(command.getTransactionId());
break;
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
- break;
+ break;
case JournalTrace.DATA_STRUCTURE_TYPE:
- JournalTrace trace = (JournalTrace) c;
+ JournalTrace trace = (JournalTrace)c;
log.debug("TRACE Entry: " + trace.getMessage());
break;
default:
@@ -590,14 +582,13 @@
}
/**
- *
* @param command
* @param sync
* @return
* @throws IOException
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
- if( started.get() )
+ if (started.get())
return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@@ -609,19 +600,19 @@
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
- newPercentUsage = ((newPercentUsage)/10)*10;
- oldPercentUsage = ((oldPercentUsage)/10)*10;
+ newPercentUsage = ((newPercentUsage) / 10) * 10;
+ oldPercentUsage = ((oldPercentUsage) / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90;
checkpoint(sync, true);
}
}
-
+
public JournalTransactionStore getTransactionStore() {
return transactionStore;
}
- public void deleteAllMessages() throws IOException {
+ public void deleteAllMessages() throws IOException {
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
@@ -661,28 +652,28 @@
}
public void setUseExternalMessageReferences(boolean enable) {
- if( enable )
+ if (enable)
throw new IllegalArgumentException("The journal does not support message references.");
}
-
+
public Packet toPacket(ByteSequence sequence) {
- return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+ return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
-
+
public ByteSequence toByteSequence(Packet packet) {
- org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
- return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+ return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
-
- public void setBrokerName(String brokerName){
+
+ public void setBrokerName(String brokerName) {
longTermPersistence.setBrokerName(brokerName);
}
-
- public String toString(){
+
+ public String toString() {
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
}
- public void setDirectory(File dir){
+ public void setDirectory(File dir) {
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Wed Aug 8 11:56:59 2007
@@ -34,34 +34,34 @@
/**
* Factory class that can create PersistenceAdapter objects.
- *
+ *
* @version $Revision: 1.4 $
*/
public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
-
- private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
+
+ private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
-
- private int journalLogFileSize = 1024*1024*20;
+
+ private int journalLogFileSize = 1024 * 1024 * 20;
private int journalLogFiles = 2;
private TaskRunnerFactory taskRunnerFactory;
private Journal journal;
- private boolean useJournal=true;
- private boolean useQuickJournal=false;
+ private boolean useJournal = true;
+ private boolean useQuickJournal = false;
private File journalArchiveDirectory;
- private boolean failIfJournalIsLocked=false;
+ private boolean failIfJournalIsLocked = false;
private int journalThreadPriority = Thread.MAX_PRIORITY;
private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
-
+
public PersistenceAdapter createPersistenceAdapter() throws IOException {
jdbcPersistenceAdapter.setDataSource(getDataSource());
-
- if( !useJournal ) {
+
+ if (!useJournal) {
return jdbcPersistenceAdapter;
}
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
-
+
}
public int getJournalLogFiles() {
@@ -81,13 +81,13 @@
/**
* Sets the size of the journal log files
- *
+ *
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalLogFileSize(int journalLogFileSize) {
this.journalLogFileSize = journalLogFileSize;
}
-
+
public JDBCPersistenceAdapter getJdbcAdapter() {
return jdbcPersistenceAdapter;
}
@@ -101,8 +101,9 @@
}
/**
- * Enables or disables the use of the journal. The default is to use the journal
- *
+ * Enables or disables the use of the journal. The default is to use the
+ * journal
+ *
* @param useJournal
*/
public void setUseJournal(boolean useJournal) {
@@ -110,8 +111,9 @@
}
public TaskRunnerFactory getTaskRunnerFactory() {
- if( taskRunnerFactory == null ) {
- taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000);
+ if (taskRunnerFactory == null) {
+ taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
+ true, 1000);
}
return taskRunnerFactory;
}
@@ -121,7 +123,7 @@
}
public Journal getJournal() throws IOException {
- if( journal == null ) {
+ if (journal == null) {
createJournal();
}
return journal;
@@ -132,7 +134,7 @@
}
public File getJournalArchiveDirectory() {
- if( journalArchiveDirectory == null && useQuickJournal ) {
+ if (journalArchiveDirectory == null && useQuickJournal) {
journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
}
return journalArchiveDirectory;
@@ -142,15 +144,14 @@
this.journalArchiveDirectory = journalArchiveDirectory;
}
-
public boolean isUseQuickJournal() {
return useQuickJournal;
}
/**
- * Enables or disables the use of quick journal, which keeps messages in the journal and just
- * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside
- * long term in the JDBC database.
+ * Enables or disables the use of quick journal, which keeps messages in the
+ * journal and just stores a reference to the messages in JDBC. Defaults to
+ * false so that messages actually reside long term in the JDBC database.
*/
public void setUseQuickJournal(boolean useQuickJournal) {
this.useQuickJournal = useQuickJournal;
@@ -167,6 +168,7 @@
public Statements getStatements() {
return jdbcPersistenceAdapter.getStatements();
}
+
public void setStatements(Statements statements) {
jdbcPersistenceAdapter.setStatements(statements);
}
@@ -176,7 +178,8 @@
}
/**
- * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+ * Sets whether or not an exclusive database lock should be used to enable
+ * JDBC Master/Slave. Enabled by default.
*/
public void setUseDatabaseLock(boolean useDatabaseLock) {
jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
@@ -192,16 +195,16 @@
public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
}
-
- public int getJournalThreadPriority(){
+
+ public int getJournalThreadPriority() {
return journalThreadPriority;
}
/**
* Sets the thread priority of the journal thread
*/
- public void setJournalThreadPriority(int journalThreadPriority){
- this.journalThreadPriority=journalThreadPriority;
+ public void setJournalThreadPriority(int journalThreadPriority) {
+ this.journalThreadPriority = journalThreadPriority;
}
/**
@@ -209,15 +212,18 @@
*/
protected void createJournal() throws IOException {
File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
- if( failIfJournalIsLocked ) {
- journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+ if (failIfJournalIsLocked) {
+ journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
+ getJournalArchiveDirectory());
} else {
- while( true ) {
+ while (true) {
try {
- journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+ journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
+ getJournalArchiveDirectory());
break;
} catch (JournalLockedException e) {
- log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked.");
+ log.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
+ + " seconds for the journal to be unlocked.");
try {
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
@@ -226,7 +232,5 @@
}
}
}
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Wed Aug 8 11:56:59 2007
@@ -41,26 +41,29 @@
* @version $Revision: 1.13 $
*/
public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
-
+
private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class);
private TopicMessageStore longTermStore;
- private HashMap ackedLastAckLocations = new HashMap();
-
- public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, ActiveMQTopic destinationName) {
+ private HashMap ackedLastAckLocations = new HashMap();
+
+ public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
+ ActiveMQTopic destinationName) {
super(adapter, checkpointStore, destinationName);
this.longTermStore = checkpointStore;
}
-
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
+ throws Exception {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
-
- public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+ MessageRecoveryListener listener) throws Exception {
this.peristenceAdapter.checkpoint(true, true);
- longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
-
+ longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
+
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
@@ -75,66 +78,69 @@
public void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
}
-
+
/**
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+ final MessageId messageId) throws IOException {
final boolean debug = log.isDebugEnabled();
-
+
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
- ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
+ ack.setTransactionId(context.getTransaction() != null
+ ? context.getTransaction().getTransactionId() : null);
final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
-
- final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
- if( !context.isInTransaction() ) {
- if( debug )
- log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+
+ final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ if (!context.isInTransaction()) {
+ if (debug)
+ log.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
acknowledge(messageId, location, key);
} else {
- if( debug )
- log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+ if (debug)
+ log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this, ack, location);
- context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Exception {
- if( debug )
- log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+ context.getTransaction().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ if (debug)
+ log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
synchronized (JournalTopicMessageStore.this) {
inFlightTxLocations.remove(location);
acknowledge(messageId, location, key);
}
}
- public void afterRollback() throws Exception {
- if( debug )
- log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+
+ public void afterRollback() throws Exception {
+ if (debug)
+ log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
synchronized (JournalTopicMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
-
+
}
-
- public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+
+ public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
+ MessageId messageId) {
try {
SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
- if( sub != null ) {
+ if (sub != null) {
longTermStore.acknowledge(context, clientId, subscritionName, messageId);
}
- }
- catch (Throwable e) {
- log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
+ } catch (Throwable e) {
+ log.debug("Could not replay acknowledge for message '" + messageId
+ + "'. Message may have already been acknowledged. reason: " + e);
}
}
-
/**
* @param messageId
@@ -142,15 +148,15 @@
* @param key
*/
protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
- synchronized(this) {
- lastLocation = location;
- ackedLastAckLocations.put(key, messageId);
- }
+ synchronized (this) {
+ lastLocation = location;
+ ackedLastAckLocations.put(key, messageId);
+ }
}
-
+
public RecordLocation checkpoint() throws IOException {
-
- final HashMap cpAckedLastAckLocations;
+
+ final HashMap cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
@@ -158,15 +164,16 @@
this.ackedLastAckLocations = new HashMap();
}
- return super.checkpoint( new Callback() {
+ return super.checkpoint(new Callback() {
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
Iterator iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
- SubscriptionKey subscriptionKey = (SubscriptionKey) iterator.next();
- MessageId identity = (MessageId) cpAckedLastAckLocations.get(subscriptionKey);
- longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
+ SubscriptionKey subscriptionKey = (SubscriptionKey)iterator.next();
+ MessageId identity = (MessageId)cpAckedLastAckLocations.get(subscriptionKey);
+ longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
+ subscriptionKey.subscriptionName, identity);
}
}
@@ -175,30 +182,27 @@
}
/**
- * @return Returns the longTermStore.
- */
- public TopicMessageStore getLongTermTopicMessageStore() {
- return longTermStore;
- }
+ * @return Returns the longTermStore.
+ */
+ public TopicMessageStore getLongTermTopicMessageStore() {
+ return longTermStore;
+ }
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
longTermStore.deleteSubscription(clientId, subscriptionName);
}
-
+
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
-
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ public int getMessageCount(String clientId, String subscriberName) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
- return longTermStore.getMessageCount(clientId,subscriberName);
- }
-
- public void resetBatching(String clientId,String subscriptionName) {
- longTermStore.resetBatching(clientId,subscriptionName);
+ return longTermStore.getMessageCount(clientId, subscriberName);
}
-
+ public void resetBatching(String clientId, String subscriptionName) {
+ longTermStore.resetBatching(clientId, subscriptionName);
+ }
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Wed Aug 8 11:56:59 2007
@@ -36,7 +36,6 @@
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
-
/**
*/
public class JournalTransactionStore implements TransactionStore {
@@ -46,26 +45,27 @@
Map preparedTransactions = new LinkedHashMap();
private boolean doingRecover;
-
public static class TxOperation {
-
- static final byte ADD_OPERATION_TYPE = 0;
- static final byte REMOVE_OPERATION_TYPE = 1;
- static final byte ACK_OPERATION_TYPE = 3;
-
+
+ static final byte ADD_OPERATION_TYPE = 0;
+ static final byte REMOVE_OPERATION_TYPE = 1;
+ static final byte ACK_OPERATION_TYPE = 3;
+
public byte operationType;
public JournalMessageStore store;
public Object data;
-
+
public TxOperation(byte operationType, JournalMessageStore store, Object data) {
- this.operationType=operationType;
- this.store=store;
- this.data=data;
+ this.operationType = operationType;
+ this.store = store;
+ this.data = data;
}
-
+
}
+
/**
* Operations
+ *
* @version $Revision: 1.6 $
*/
public static class Tx {
@@ -74,7 +74,7 @@
private ArrayList operations = new ArrayList();
public Tx(RecordLocation location) {
- this.location=location;
+ this.location = location;
}
public void add(JournalMessageStore store, Message msg) {
@@ -88,12 +88,12 @@
public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
}
-
+
public Message[] getMessages() {
ArrayList list = new ArrayList();
for (Iterator iter = operations.iterator(); iter.hasNext();) {
- TxOperation op = (TxOperation) iter.next();
- if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+ TxOperation op = (TxOperation)iter.next();
+ if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
list.add(op.data);
}
}
@@ -105,8 +105,8 @@
public MessageAck[] getAcks() {
ArrayList list = new ArrayList();
for (Iterator iter = operations.iterator(); iter.hasNext();) {
- TxOperation op = (TxOperation) iter.next();
- if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+ TxOperation op = (TxOperation)iter.next();
+ if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
list.add(op.data);
}
}
@@ -129,43 +129,44 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void prepare(TransactionId txid) throws IOException{
- Tx tx=null;
- synchronized(inflightTransactions){
- tx=(Tx)inflightTransactions.remove(txid);
+ public void prepare(TransactionId txid) throws IOException {
+ Tx tx = null;
+ synchronized (inflightTransactions) {
+ tx = (Tx)inflightTransactions.remove(txid);
}
- if(tx==null)
+ if (tx == null)
return;
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
- synchronized(preparedTransactions){
- preparedTransactions.put(txid,tx);
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
+ true);
+ synchronized (preparedTransactions) {
+ preparedTransactions.put(txid, tx);
}
}
-
+
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void replayPrepare(TransactionId txid) throws IOException{
- Tx tx=null;
- synchronized(inflightTransactions){
- tx=(Tx)inflightTransactions.remove(txid);
+ public void replayPrepare(TransactionId txid) throws IOException {
+ Tx tx = null;
+ synchronized (inflightTransactions) {
+ tx = (Tx)inflightTransactions.remove(txid);
}
- if(tx==null)
+ if (tx == null)
return;
- synchronized(preparedTransactions){
- preparedTransactions.put(txid,tx);
+ synchronized (preparedTransactions) {
+ preparedTransactions.put(txid, tx);
}
}
- public Tx getTx(Object txid,RecordLocation location){
- Tx tx=null;
- synchronized(inflightTransactions){
- tx=(Tx)inflightTransactions.get(txid);
+ public Tx getTx(Object txid, RecordLocation location) {
+ Tx tx = null;
+ synchronized (inflightTransactions) {
+ tx = (Tx)inflightTransactions.get(txid);
}
- if(tx==null){
- tx=new Tx(location);
- inflightTransactions.put(txid,tx);
+ if (tx == null) {
+ tx = new Tx(location);
+ inflightTransactions.put(txid, tx);
}
return tx;
}
@@ -174,24 +175,25 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+ public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
Tx tx;
- if(wasPrepared){
- synchronized(preparedTransactions){
- tx=(Tx)preparedTransactions.remove(txid);
+ if (wasPrepared) {
+ synchronized (preparedTransactions) {
+ tx = (Tx)preparedTransactions.remove(txid);
}
- }else{
- synchronized(inflightTransactions){
- tx=(Tx)inflightTransactions.remove(txid);
+ } else {
+ synchronized (inflightTransactions) {
+ tx = (Tx)inflightTransactions.remove(txid);
}
}
- if(tx==null)
+ if (tx == null)
return;
- if(txid.isXATransaction()){
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
- }else{
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
- true);
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
+ wasPrepared), true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
+ wasPrepared), true);
}
}
@@ -199,13 +201,13 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
- if(wasPrepared){
- synchronized(preparedTransactions){
+ public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+ if (wasPrepared) {
+ synchronized (preparedTransactions) {
return (Tx)preparedTransactions.remove(txid);
}
- }else{
- synchronized(inflightTransactions){
+ } else {
+ synchronized (inflightTransactions) {
return (Tx)inflightTransactions.remove(txid);
}
}
@@ -215,21 +217,22 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void rollback(TransactionId txid) throws IOException{
- Tx tx=null;
- synchronized(inflightTransactions){
- tx=(Tx)inflightTransactions.remove(txid);
- }
- if(tx!=null)
- synchronized(preparedTransactions){
- tx=(Tx)preparedTransactions.remove(txid);
- }
- if(tx!=null){
- if(txid.isXATransaction()){
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
- }else{
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
- true);
+ public void rollback(TransactionId txid) throws IOException {
+ Tx tx = null;
+ synchronized (inflightTransactions) {
+ tx = (Tx)inflightTransactions.remove(txid);
+ }
+ if (tx != null)
+ synchronized (preparedTransactions) {
+ tx = (Tx)preparedTransactions.remove(txid);
+ }
+ if (tx != null) {
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
+ false), true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
+ txid, false), true);
}
}
}
@@ -238,42 +241,42 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void replayRollback(TransactionId txid) throws IOException{
- boolean inflight=false;
- synchronized(inflightTransactions){
- inflight=inflightTransactions.remove(txid)!=null;
+ public void replayRollback(TransactionId txid) throws IOException {
+ boolean inflight = false;
+ synchronized (inflightTransactions) {
+ inflight = inflightTransactions.remove(txid) != null;
}
- if(inflight){
- synchronized(preparedTransactions){
+ if (inflight) {
+ synchronized (preparedTransactions) {
preparedTransactions.remove(txid);
}
}
}
-
+
public void start() throws Exception {
}
public void stop() throws Exception {
}
-
- synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+
+ synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the in-flight transactions get rolled back..
- synchronized(inflightTransactions){
+ synchronized (inflightTransactions) {
inflightTransactions.clear();
}
- this.doingRecover=true;
- try{
- Map txs=null;
- synchronized(preparedTransactions){
- txs=new LinkedHashMap(preparedTransactions);
- }
- for(Iterator iter=txs.keySet().iterator();iter.hasNext();){
- Object txid=(Object)iter.next();
- Tx tx=(Tx)txs.get(txid);
- listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+ this.doingRecover = true;
+ try {
+ Map txs = null;
+ synchronized (preparedTransactions) {
+ txs = new LinkedHashMap(preparedTransactions);
+ }
+ for (Iterator iter = txs.keySet().iterator(); iter.hasNext();) {
+ Object txid = (Object)iter.next();
+ Tx tx = (Tx)txs.get(txid);
+ listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
}
- }finally{
- this.doingRecover=false;
+ } finally {
+ this.doingRecover = false;
}
}
@@ -290,40 +293,40 @@
* @param ack
* @throws IOException
*/
- public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
+ public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
+ throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
-
-
+
public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
-
- public RecordLocation checkpoint() throws IOException{
+ public RecordLocation checkpoint() throws IOException {
// Nothing really to checkpoint.. since, we don't
- // checkpoint tx operations in to long term store until they are committed.
+ // checkpoint tx operations in to long term store until they are
+ // committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
- RecordLocation rc=null;
- synchronized(inflightTransactions){
- for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){
- Tx tx=(Tx)iter.next();
- RecordLocation location=tx.location;
- if(rc==null||rc.compareTo(location)<0){
- rc=location;
+ RecordLocation rc = null;
+ synchronized (inflightTransactions) {
+ for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+ Tx tx = (Tx)iter.next();
+ RecordLocation location = tx.location;
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
}
}
}
- synchronized(preparedTransactions){
- for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){
- Tx tx=(Tx)iter.next();
- RecordLocation location=tx.location;
- if(rc==null||rc.compareTo(location)<0){
- rc=location;
+ synchronized (preparedTransactions) {
+ for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+ Tx tx = (Tx)iter.next();
+ RecordLocation location = tx.location;
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
}
}
return rc;
@@ -333,6 +336,5 @@
public boolean isDoingRecover() {
return doingRecover;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java Wed Aug 8 11:56:59 2007
@@ -28,35 +28,36 @@
/**
* Marshall an AMQTx
+ *
* @version $Revision: 1.10 $
*/
-public class AMQTxMarshaller implements Marshaller<AMQTx>{
+public class AMQTxMarshaller implements Marshaller<AMQTx> {
private WireFormat wireFormat;
- public AMQTxMarshaller(WireFormat wireFormat){
- this.wireFormat=wireFormat;
+ public AMQTxMarshaller(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
}
- public AMQTx readPayload(DataInput dataIn) throws IOException{
- Location location=new Location();
+ public AMQTx readPayload(DataInput dataIn) throws IOException {
+ Location location = new Location();
location.readExternal(dataIn);
- AMQTx result=new AMQTx(location);
- int size=dataIn.readInt();
- for(int i=0;i<size;i++){
- AMQTxOperation op=new AMQTxOperation();
- op.readExternal(wireFormat,dataIn);
+ AMQTx result = new AMQTx(location);
+ int size = dataIn.readInt();
+ for (int i = 0; i < size; i++) {
+ AMQTxOperation op = new AMQTxOperation();
+ op.readExternal(wireFormat, dataIn);
result.getOperations().add(op);
}
return result;
}
- public void writePayload(AMQTx amqtx,DataOutput dataOut) throws IOException{
+ public void writePayload(AMQTx amqtx, DataOutput dataOut) throws IOException {
amqtx.getLocation().writeExternal(dataOut);
- List<AMQTxOperation> list=amqtx.getOperations();
+ List<AMQTxOperation> list = amqtx.getOperations();
dataOut.writeInt(list.size());
- for(AMQTxOperation op:list){
- op.writeExternal(wireFormat,dataOut);
+ for (AMQTxOperation op : list) {
+ op.writeExternal(wireFormat, dataOut);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Wed Aug 8 11:56:59 2007
@@ -22,20 +22,19 @@
import org.apache.activemq.kaha.Marshaller;
import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Marshall an AtomicInteger
+ *
* @version $Revision: 1.10 $
*/
-public class AtomicIntegerMarshaller implements Marshaller<AtomicInteger>{
-
+public class AtomicIntegerMarshaller implements Marshaller<AtomicInteger> {
+
+ public void writePayload(AtomicInteger ai, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(ai.get());
- public void writePayload(AtomicInteger ai,DataOutput dataOut) throws IOException{
- dataOut.writeInt(ai.get());
-
}
- public AtomicInteger readPayload(DataInput dataIn) throws IOException{
+ public AtomicInteger readPayload(DataInput dataIn) throws IOException {
int value = dataIn.readInt();
return new AtomicInteger(value);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java Wed Aug 8 11:56:59 2007
@@ -22,59 +22,56 @@
*
* @version $Revision: 1.10 $
*/
-public class ConsumerMessageRef{
+public class ConsumerMessageRef {
private MessageId messageId;
private StoreEntry messageEntry;
private StoreEntry ackEntry;
-
+
/**
* @return the ackEntry
*/
- public StoreEntry getAckEntry(){
+ public StoreEntry getAckEntry() {
return this.ackEntry;
}
-
+
/**
* @param ackEntry the ackEntry to set
*/
- public void setAckEntry(StoreEntry ackEntry){
- this.ackEntry=ackEntry;
+ public void setAckEntry(StoreEntry ackEntry) {
+ this.ackEntry = ackEntry;
}
-
+
/**
* @return the messageEntry
*/
- public StoreEntry getMessageEntry(){
+ public StoreEntry getMessageEntry() {
return this.messageEntry;
}
-
+
/**
* @param messageEntry the messageEntry to set
*/
- public void setMessageEntry(StoreEntry messageEntry){
- this.messageEntry=messageEntry;
+ public void setMessageEntry(StoreEntry messageEntry) {
+ this.messageEntry = messageEntry;
}
-
/**
* @return the messageId
*/
- public MessageId getMessageId(){
+ public MessageId getMessageId() {
return this.messageId;
}
-
/**
* @param messageId the messageId to set
*/
- public void setMessageId(MessageId messageId){
- this.messageId=messageId;
+ public void setMessageId(MessageId messageId) {
+ this.messageId = messageId;
}
-
+
public String toString() {
- return "ConsumerMessageRef[" + messageId +"]";
+ return "ConsumerMessageRef[" + messageId + "]";
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java Wed Aug 8 11:56:59 2007
@@ -23,31 +23,30 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.index.IndexItem;
-
/**
* Marshall a TopicSubAck
+ *
* @version $Revision: 1.10 $
*/
-public class ConsumerMessageRefMarshaller implements Marshaller{
-
+public class ConsumerMessageRefMarshaller implements Marshaller {
/**
* @param object
* @param dataOut
* @throws IOException
- * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
+ * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object,
+ * java.io.DataOutput)
*/
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- ConsumerMessageRef ref = (ConsumerMessageRef) object;
- dataOut.writeUTF(ref.getMessageId().toString());
- IndexItem item = (IndexItem)ref.getMessageEntry();
- dataOut.writeLong(item.getOffset());
- item.write(dataOut);
- item = (IndexItem)ref.getAckEntry();
- dataOut.writeLong(item.getOffset());
- item.write(dataOut);
-
-
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ ConsumerMessageRef ref = (ConsumerMessageRef)object;
+ dataOut.writeUTF(ref.getMessageId().toString());
+ IndexItem item = (IndexItem)ref.getMessageEntry();
+ dataOut.writeLong(item.getOffset());
+ item.write(dataOut);
+ item = (IndexItem)ref.getAckEntry();
+ dataOut.writeLong(item.getOffset());
+ item.write(dataOut);
+
}
/**
@@ -56,7 +55,7 @@
* @throws IOException
* @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput)
*/
- public Object readPayload(DataInput dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException {
ConsumerMessageRef ref = new ConsumerMessageRef();
ref.setMessageId(new MessageId(dataIn.readUTF()));
IndexItem item = new IndexItem();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Wed Aug 8 11:56:59 2007
@@ -22,18 +22,18 @@
import org.apache.activemq.kaha.Marshaller;
-
/**
* Marshall an Integer
+ *
* @version $Revision: 1.10 $
*/
public class IntegerMarshaller implements Marshaller<Integer> {
-
- public void writePayload(Integer object,DataOutput dataOut) throws IOException{
- dataOut.writeInt(object.intValue());
+
+ public void writePayload(Integer object, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(object.intValue());
}
- public Integer readPayload(DataInput dataIn) throws IOException{
+ public Integer readPayload(DataInput dataIn) throws IOException {
return dataIn.readInt();
}
}