You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/12 19:52:54 UTC
svn commit: r405807 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid:
./ RapidMessageReference.java RapidMessageStore.java
RapidPersistenceAdapter.java RapidTopicMessageStore.java
RapidTransactionStore.java
Author: chirino
Date: Fri May 12 10:52:51 2006
New Revision: 405807
URL: http://svn.apache.org/viewcvs?rev=405807&view=rev
Log:
Implementing a Rapid store which is a mix of the QuickJournal and the Kaha store.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (with props)
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java Fri May 12 10:52:51 2006
@@ -0,0 +1,46 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.rapid;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+public class RapidMessageReference {
+ public final MessageId messageId;
+ public final long expiration;
+ public final RecordLocation location;
+
+ public RapidMessageReference(Message message, RecordLocation location) {
+ this.messageId = message.getMessageId();
+ this.expiration = message.getExpiration();
+ this.location=location;
+ }
+
+ public long getExpiration() {
+ return expiration;
+ }
+
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ public RecordLocation getLocation() {
+ return location;
+ }
+}
\ No newline at end of file
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,289 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.TransactionTemplate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ *
+ * @version $Revision: 1.14 $
+ */
+public class RapidMessageStore implements MessageStore {
+
+ private static final Log log = LogFactory.getLog(RapidMessageStore.class);
+
+ protected final RapidPersistenceAdapter peristenceAdapter;
+ protected final RapidTransactionStore transactionStore;
+ protected final MapContainer messageContainer;
+ protected final ActiveMQDestination destination;
+ protected final TransactionTemplate transactionTemplate;
+
+// private LinkedHashMap messages = new LinkedHashMap();
+// private ArrayList messageAcks = new ArrayList();
+
+// /** A MessageStore that we can use to retrieve messages quickly. */
+// private LinkedHashMap cpAddedMessageIds;
+
+ protected RecordLocation lastLocation;
+ protected HashSet inFlightTxLocations = new HashSet();
+
+ public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) {
+ this.peristenceAdapter = adapter;
+ this.transactionStore = adapter.getTransactionStore();
+ this.messageContainer = container;
+ this.destination = destination;
+ this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+ }
+
+
+ /**
+ * Not synchronized since the Journal has better throughput if you increase
+ * the number of concurrent writes that it is doing.
+ */
+ public void addMessage(ConnectionContext context, final Message message) throws IOException {
+
+ final MessageId id = message.getMessageId();
+
+ final boolean debug = log.isDebugEnabled();
+ final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+ final RapidMessageReference md = new RapidMessageReference(message, location);
+
+ if( !context.isInTransaction() ) {
+ if( debug )
+ log.debug("Journalled message add for: "+id+", at: "+location);
+ addMessage(md);
+ } else {
+ message.incrementReferenceCount();
+ if( debug )
+ log.debug("Journalled transacted message add for: "+id+", at: "+location);
+ synchronized( this ) {
+ inFlightTxLocations.add(location);
+ }
+ transactionStore.addMessage(this, message, location);
+ context.getTransaction().addSynchronization(new Synchronization(){
+ public void afterCommit() throws Exception {
+ if( debug )
+ log.debug("Transacted message add commit for: "+id+", at: "+location);
+ message.decrementReferenceCount();
+ synchronized( RapidMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ addMessage(md);
+ }
+ }
+ public void afterRollback() throws Exception {
+ if( debug )
+ log.debug("Transacted message add rollback for: "+id+", at: "+location);
+ message.decrementReferenceCount();
+ synchronized( RapidMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ }
+ }
+ });
+ }
+ }
+
+ private void addMessage(final RapidMessageReference messageReference) {
+ synchronized (this) {
+ lastLocation = messageReference.getLocation();
+ MessageId id = messageReference.getMessageId();
+ messageContainer.put(id.toString(), messageReference);
+ }
+ }
+
+ static protected String toString(RecordLocation location) {
+ Location l = (Location) location;
+ return l.getLogFileId()+":"+l.getLogFileOffset();
+ }
+
+ static protected RecordLocation toRecordLocation(String t) {
+ String[] strings = t.split(":");
+ if( strings.length!=2 )
+ throw new IllegalArgumentException("Invalid location: "+t);
+ return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
+ }
+
+ public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) {
+ try {
+ RapidMessageReference messageReference = new RapidMessageReference(message, location);
+ messageContainer.put(message.getMessageId().toString(), messageReference);
+ }
+ catch (Throwable e) {
+ log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
+ }
+ }
+
+ /**
+ */
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ final boolean debug = log.isDebugEnabled();
+ JournalQueueAck remove = new JournalQueueAck();
+ remove.setDestination(destination);
+ remove.setMessageAck(ack);
+
+ final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+ if( !context.isInTransaction() ) {
+ if( debug )
+ log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+ removeMessage(ack, location);
+ } else {
+ if( debug )
+ log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( this ) {
+ inFlightTxLocations.add(location);
+ }
+ transactionStore.removeMessage(this, ack, location);
+ context.getTransaction().addSynchronization(new Synchronization(){
+ public void afterCommit() throws Exception {
+ if( debug )
+ log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( RapidMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ removeMessage(ack, location);
+ }
+ }
+ public void afterRollback() throws Exception {
+ if( debug )
+ log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( RapidMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ }
+ }
+ });
+
+ }
+ }
+
+ private void removeMessage(final MessageAck ack, final RecordLocation location) {
+ synchronized (this) {
+ lastLocation = location;
+ MessageId id = ack.getLastMessageId();
+ messageContainer.remove(id.toString());
+ }
+ }
+
+ public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
+ try {
+ MessageId id = ack.getLastMessageId();
+ messageContainer.remove(id.toString());
+ }
+ catch (Throwable e) {
+ log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
+ }
+ }
+
+ /**
+ *
+ */
+ public Message getMessage(MessageId id) throws IOException {
+ RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString());
+ if (messageReference == null )
+ return null;
+ return (Message) peristenceAdapter.readCommand(messageReference.getLocation());
+ }
+
+ /**
+ * Replays the checkpointStore first as those messages are the oldest ones,
+ * then messages are replayed from the transaction log and then the cache is
+ * updated.
+ *
+ * @param listener
+ * @throws Exception
+ */
+ public void recover(final MessageRecoveryListener listener) throws Exception {
+
+ for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
+ RapidMessageReference messageReference=(RapidMessageReference) iter.next();
+ Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
+ listener.recoverMessage(m);
+ }
+ listener.finished();
+
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ /**
+ * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
+ */
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ messageContainer.clear();
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+ throw new IOException("The journal does not support message references.");
+ }
+
+ public String getMessageReference(MessageId identity) throws IOException {
+ throw new IOException("The journal does not support message references.");
+ }
+
+
+ public void setUsageManager(UsageManager usageManager) {
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ */
+ public RecordLocation checkpoint() throws IOException {
+
+ ArrayList cpActiveJournalLocations;
+
+ // swap out the message hash maps..
+ synchronized (this) {
+ cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
+ }
+
+ if( cpActiveJournalLocations.size() > 0 ) {
+ Collections.sort(cpActiveJournalLocations);
+ return (RecordLocation) cpActiveJournalLocations.get(0);
+ } else {
+ return lastLocation;
+ }
+ }
+
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Fri May 12 10:52:51 2006
@@ -0,0 +1,672 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
+import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
+import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
+import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Callable;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.17 $
+ */
+public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
+
+ private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class);
+ private final Journal journal;
+
+ private final WireFormat wireFormat = new OpenWireFormat();
+
+ private final ConcurrentHashMap queues = new ConcurrentHashMap();
+ private final ConcurrentHashMap topics = new ConcurrentHashMap();
+
+ private long checkpointInterval = 1000 * 60 * 5;
+ private long lastCheckpointRequest = System.currentTimeMillis();
+ private long lastCleanup = System.currentTimeMillis();
+ private int maxCheckpointWorkers = 10;
+ private int maxCheckpointMessageAddSize = 5000;
+
+ private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
+ private ThreadPoolExecutor checkpointExecutor;
+
+ private TaskRunner checkpointTask;
+ private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+ private boolean fullCheckPoint;
+
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ Store store;
+ private boolean useExternalMessageReferences;
+
+
+ private final Runnable periodicCheckpointTask = new Runnable() {
+ public void run() {
+ if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+ checkpoint(false, true);
+ }
+ }
+ };
+
+ public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
+
+ this.journal = journal;
+ journal.setJournalEventListener(this);
+
+ File dir = ((JournalImpl)journal).getLogDirectory();
+ String name=dir.getAbsolutePath()+File.separator+"kaha.db";
+ store=StoreFactory.open(name,"rw");
+
+ checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+ public boolean iterate() {
+ return doCheckpoint();
+ }
+ }, "ActiveMQ Checkpoint Worker");
+
+ }
+
+ public Set getDestinations() {
+ Set rc=new HashSet();
+ try {
+ for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+ Object obj=i.next();
+ if(obj instanceof ActiveMQDestination){
+ rc.add(obj);
+ }
+ }
+ }catch(IOException e){
+ log.error("Failed to get destinations " ,e);
+ }
+ return rc;
+ }
+
+ private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+ if (destination.isQueue()) {
+ return createQueueMessageStore((ActiveMQQueue) destination);
+ }
+ else {
+ return createTopicMessageStore((ActiveMQTopic) destination);
+ }
+ }
+
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ RapidMessageStore store = (RapidMessageStore) queues.get(destination);
+ if (store == null) {
+ MapContainer messageContainer=getMapContainer(destination,"topic-data");
+ store = new RapidMessageStore(this, destination, messageContainer);
+ queues.put(destination, store);
+ }
+ return store;
+ }
+
+ protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
+ MapContainer container=store.getMapContainer(id,containerName);
+ container.setKeyMarshaller(new StringMarshaller());
+ if(useExternalMessageReferences){
+ container.setValueMarshaller(new StringMarshaller());
+ }else{
+ container.setValueMarshaller(new CommandMarshaller(wireFormat));
+ }
+ container.load();
+ return container;
+ }
+
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+ RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination);
+ if (store == null) {
+
+ MapContainer messageContainer=getMapContainer(destination,"topic-data");
+ MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs");
+ MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks");
+
+ ackContainer.setKeyMarshaller(new StringMarshaller());
+ ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
+
+ store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer);
+ topics.put(destination, store);
+ }
+ return store;
+ }
+
+ public TransactionStore createTransactionStore() throws IOException {
+ return transactionStore;
+ }
+
+ public long getLastMessageBrokerSequenceId() throws IOException {
+ // TODO: implement this.
+ return 0;
+ }
+
+ public void beginTransaction(ConnectionContext context) throws IOException {
+ }
+
+ public void commitTransaction(ConnectionContext context) throws IOException {
+ }
+
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
+ }
+
+ public synchronized void start() throws Exception {
+ if( !started.compareAndSet(false, true) )
+ return;
+
+ checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, "Journal checkpoint worker");
+ t.setPriority(7);
+ return t;
+ }
+ });
+ checkpointExecutor.allowCoreThreadTimeOut(true);
+
+ createTransactionStore();
+ recover();
+
+ // Do a checkpoint periodically.
+ Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
+
+ }
+
+ public void stop() throws Exception {
+
+ if( !started.compareAndSet(true, false) )
+ return;
+
+ Scheduler.cancel(periodicCheckpointTask);
+
+ // Take one final checkpoint and stop checkpoint processing.
+ checkpoint(false, true);
+ checkpointTask.shutdown();
+ checkpointExecutor.shutdown();
+
+ queues.clear();
+ topics.clear();
+
+ IOException firstException = null;
+ try {
+ journal.close();
+ } catch (Exception e) {
+ firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+ }
+ store.close();
+
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+
+ /**
+ * @return Returns the wireFormat.
+ */
+ public WireFormat getWireFormat() {
+ return wireFormat;
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+
+ /**
+ * The Journal give us a call back so that we can move old data out of the
+ * journal. Taking a checkpoint does this for us.
+ *
+ * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
+ */
+ public void overflowNotification(RecordLocation safeLocation) {
+ checkpoint(false, true);
+ }
+
+ /**
+ * When we checkpoint we move all the journalled data to long term storage.
+ * @param stopping
+ *
+ * @param b
+ */
+ public void checkpoint(boolean sync, boolean fullCheckpoint) {
+ try {
+ if (journal == null )
+ throw new IllegalStateException("Journal is closed.");
+
+ long now = System.currentTimeMillis();
+ CountDownLatch latch = null;
+ synchronized(this) {
+ latch = nextCheckpointCountDownLatch;
+ lastCheckpointRequest = now;
+ if( fullCheckpoint ) {
+ this.fullCheckPoint = true;
+ }
+ }
+
+ checkpointTask.wakeup();
+
+ if (sync) {
+ log.debug("Waking for checkpoint to complete.");
+ latch.await();
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn("Request to start checkpoint failed: " + e, e);
+ }
+ }
+
+ /**
+ * This does the actual checkpoint.
+ * @return
+ */
+ public boolean doCheckpoint() {
+ CountDownLatch latch = null;
+ boolean fullCheckpoint;
+ synchronized(this) {
+ latch = nextCheckpointCountDownLatch;
+ nextCheckpointCountDownLatch = new CountDownLatch(1);
+ fullCheckpoint = this.fullCheckPoint;
+ this.fullCheckPoint=false;
+ }
+ try {
+
+ log.debug("Checkpoint started.");
+ RecordLocation newMark = null;
+
+ ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
+
+ //
+ // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
+ // to long term store as soon as possible.
+ //
+ // We want to avoid doing that for queue messages since removes the come in the same
+ // checkpoint cycle will nullify the previous message add. Therefore, we only
+ // checkpoint queues on the fullCheckpoint cycles.
+ //
+ if( fullCheckpoint ) {
+ Iterator iterator = queues.values().iterator();
+ while (iterator.hasNext()) {
+ try {
+ final RapidMessageStore ms = (RapidMessageStore) iterator.next();
+ FutureTask task = new FutureTask(new Callable() {
+ public Object call() throws Exception {
+ return ms.checkpoint();
+ }});
+ futureTasks.add(task);
+ checkpointExecutor.execute(task);
+ }
+ catch (Exception e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+ }
+ }
+
+ Iterator iterator = topics.values().iterator();
+ while (iterator.hasNext()) {
+ try {
+ final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next();
+ FutureTask task = new FutureTask(new Callable() {
+ public Object call() throws Exception {
+ return ms.checkpoint();
+ }});
+ futureTasks.add(task);
+ checkpointExecutor.execute(task);
+ }
+ catch (Exception e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+ }
+
+ try {
+ for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
+ FutureTask ft = (FutureTask) iter.next();
+ RecordLocation mark = (RecordLocation) ft.get();
+ // We only set a newMark on full checkpoints.
+ if( fullCheckpoint ) {
+ if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+ newMark = mark;
+ }
+ }
+ }
+ } catch (Throwable e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+
+
+ if( fullCheckpoint ) {
+ try {
+ if (newMark != null) {
+ log.debug("Marking journal at: " + newMark);
+ journal.setMark(newMark, true);
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to mark the Journal: " + e, e);
+ }
+
+// TODO: do we need to implement a periodic clean up?
+
+// if (longTermPersistence instanceof JDBCPersistenceAdapter) {
+// // We may be check pointing more often than the checkpointInterval if under high use
+// // But we don't want to clean up the db that often.
+// long now = System.currentTimeMillis();
+// if( now > lastCleanup+checkpointInterval ) {
+// lastCleanup = now;
+// ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
+// }
+// }
+ }
+
+ log.debug("Checkpoint done.");
+ }
+ finally {
+ latch.countDown();
+ }
+ synchronized(this) {
+ return this.fullCheckPoint;
+ }
+
+ }
+
+ /**
+ * @param location
+ * @return
+ * @throws IOException
+ */
+ public DataStructure readCommand(RecordLocation location) throws IOException {
+ try {
+ Packet data = journal.read(location);
+ return (DataStructure) wireFormat.unmarshal(data);
+ }
+ catch (InvalidRecordLocationException e) {
+ throw createReadException(location, e);
+ }
+ catch (IOException e) {
+ throw createReadException(location, e);
+ }
+ }
+
+ /**
+ * Move all the messages that were in the journal into long term storage. We
+ * just replay and do a checkpoint.
+ *
+ * @throws IOException
+ * @throws IOException
+ * @throws InvalidRecordLocationException
+ * @throws IllegalStateException
+ */
+ private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
+
+ RecordLocation pos = null;
+ int transactionCounter = 0;
+
+ log.info("Journal Recovery Started.");
+ ConnectionContext context = new ConnectionContext();
+
+ // While we have records in the journal.
+ while ((pos = journal.getNextRecordLocation(pos)) != null) {
+ Packet data = journal.read(pos);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+ if (c instanceof Message ) {
+ Message message = (Message) c;
+ RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination());
+ if ( message.isInTransaction()) {
+ transactionStore.addMessage(store, message, pos);
+ }
+ else {
+ store.replayAddMessage(context, message, pos);
+ transactionCounter++;
+ }
+ } else {
+ switch (c.getDataStructureType()) {
+ case JournalQueueAck.DATA_STRUCTURE_TYPE:
+ {
+ JournalQueueAck command = (JournalQueueAck) c;
+ RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination());
+ if (command.getMessageAck().isInTransaction()) {
+ transactionStore.removeMessage(store, command.getMessageAck(), pos);
+ }
+ else {
+ store.replayRemoveMessage(context, command.getMessageAck());
+ transactionCounter++;
+ }
+ }
+ break;
+ case JournalTopicAck.DATA_STRUCTURE_TYPE:
+ {
+ JournalTopicAck command = (JournalTopicAck) c;
+ RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination());
+ if (command.getTransactionId() != null) {
+ transactionStore.acknowledge(store, command, pos);
+ }
+ else {
+ store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
+ transactionCounter++;
+ }
+ }
+ break;
+ case JournalTransaction.DATA_STRUCTURE_TYPE:
+ {
+ JournalTransaction command = (JournalTransaction) c;
+ try {
+ // Try to replay the packet.
+ switch (command.getType()) {
+ case JournalTransaction.XA_PREPARE:
+ transactionStore.replayPrepare(command.getTransactionId());
+ break;
+ case JournalTransaction.XA_COMMIT:
+ case JournalTransaction.LOCAL_COMMIT:
+ Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+ if (tx == null)
+ break; // We may be trying to replay a commit that
+ // was already committed.
+
+ // Replay the committed operations.
+ for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+ TxOperation op = (TxOperation) iter.next();
+ if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
+ op.store.replayAddMessage(context, (Message) op.data, op.location);
+ }
+ if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
+ op.store.replayRemoveMessage(context, (MessageAck) op.data);
+ }
+ if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
+ JournalTopicAck ack = (JournalTopicAck) op.data;
+ ((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
+ .getMessageId());
+ }
+ }
+ transactionCounter++;
+ break;
+ case JournalTransaction.LOCAL_ROLLBACK:
+ case JournalTransaction.XA_ROLLBACK:
+ transactionStore.replayRollback(command.getTransactionId());
+ break;
+ }
+ }
+ catch (IOException e) {
+ log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+ }
+ }
+ break;
+ case JournalTrace.DATA_STRUCTURE_TYPE:
+ JournalTrace trace = (JournalTrace) c;
+ log.debug("TRACE Entry: " + trace.getMessage());
+ break;
+ default:
+ log.error("Unknown type of record in transaction log which will be discarded: " + c);
+ }
+ }
+ }
+
+ RecordLocation location = writeTraceMessage("RECOVERED", true);
+ journal.setMark(location, true);
+
+ log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+ }
+
+ private IOException createReadException(RecordLocation location, Exception e) {
+ return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+ }
+
+ protected IOException createWriteException(DataStructure packet, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+ }
+
+ protected IOException createWriteException(String command, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+ }
+
+ protected IOException createRecoveryFailedException(Exception e) {
+ return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+ }
+
+ /**
+ *
+ * @param command
+ * @param sync
+ * @return
+ * @throws IOException
+ */
+ public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
+ if( started.get() )
+ return journal.write(wireFormat.marshal(command), sync);
+ throw new IOException("closed");
+ }
+
+ private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
+ JournalTrace trace = new JournalTrace();
+ trace.setMessage(message);
+ return writeCommand(trace, sync);
+ }
+
+ public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
+ checkpoint(false, true);
+ }
+ }
+
+ public RapidTransactionStore getTransactionStore() {
+ return transactionStore;
+ }
+
+ public void deleteAllMessages() throws IOException {
+ try {
+ JournalTrace trace = new JournalTrace();
+ trace.setMessage("DELETED");
+ RecordLocation location = journal.write(wireFormat.marshal(trace), false);
+ journal.setMark(location, true);
+ log.info("Journal deleted: ");
+ } catch (IOException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(e);
+ }
+
+ if(store!=null){
+ store.delete();
+ }
+ }
+
+ public int getMaxCheckpointMessageAddSize() {
+ return maxCheckpointMessageAddSize;
+ }
+
+ public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+ this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+ }
+
+ public int getMaxCheckpointWorkers() {
+ return maxCheckpointWorkers;
+ }
+
+ public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+ this.maxCheckpointWorkers = maxCheckpointWorkers;
+ }
+
+ public boolean isUseExternalMessageReferences() {
+ return false;
+ }
+
+ public void setUseExternalMessageReferences(boolean enable) {
+ if( enable )
+ throw new IllegalArgumentException("The journal does not support message references.");
+ }
+
+ public void setUsageManager(UsageManager usageManager) {
+ }
+
+ public Store getStore() {
+ return store;
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,294 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ *
+ * @version $Revision: 1.13 $
+ */
+public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
+
+ private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
+
+ private HashMap ackedLastAckLocations = new HashMap();
+ private final MapContainer subscriberContainer;
+ private final MapContainer ackContainer;
+ private final Store store;
+ private Map subscriberAcks=new ConcurrentHashMap();
+
+ public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
+ super(adapter, destination, messageContainer);
+ this.subscriberContainer = subsContainer;
+ this.ackContainer = ackContainer;
+ this.store=adapter.getStore();
+
+ for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
+ Object key=i.next();
+ addSubscriberAckContainer(key);
+ }
+ }
+
+ public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ if(list!=null){
+ for(Iterator i=list.iterator();i.hasNext();){
+ Object msg=messageContainer.get(i.next());
+ if(msg!=null){
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String) msg);
+ }else{
+ listener.recoverMessage((Message) msg);
+ }
+ }
+ listener.finished();
+ }
+ } else {
+ listener.finished();
+ }
+
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+ }
+
+ public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ SubscriptionInfo info=new SubscriptionInfo();
+ info.setDestination(destination);
+ info.setClientId(clientId);
+ info.setSelector(selector);
+ info.setSubcriptionName(subscriptionName);
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ // if already exists - won't add it again as it causes data files
+ // to hang around
+ if(!subscriberContainer.containsKey(key)){
+ subscriberContainer.put(key,info);
+ }
+ addSubscriberAckContainer(key);
+ }
+
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ int subscriberCount=subscriberAcks.size();
+ if(subscriberCount>0){
+ String id=message.getMessageId().toString();
+ ackContainer.put(id,new AtomicInteger(subscriberCount));
+ for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
+ Object key=i.next();
+ ListContainer container=store.getListContainer(key,"durable-subs");
+ container.add(id);
+ }
+ super.addMessage(context,message);
+ }
+ }
+
+
+ /**
+ */
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+ final boolean debug = log.isDebugEnabled();
+
+ JournalTopicAck ack = new JournalTopicAck();
+ ack.setDestination(destination);
+ ack.setMessageId(messageId);
+ ack.setMessageSequenceId(messageId.getBrokerSequenceId());
+ ack.setSubscritionName(subscriptionName);
+ ack.setClientId(clientId);
+ ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
+ final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
+
+ final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ if( !context.isInTransaction() ) {
+ if( debug )
+ log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+ acknowledge(messageId, location, key);
+ } else {
+ if( debug )
+ log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+ synchronized (this) {
+ inFlightTxLocations.add(location);
+ }
+ transactionStore.acknowledge(this, ack, location);
+ context.getTransaction().addSynchronization(new Synchronization(){
+ public void afterCommit() throws Exception {
+ if( debug )
+ log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+ synchronized (RapidTopicMessageStore.this) {
+ inFlightTxLocations.remove(location);
+ acknowledge(messageId, location, key);
+ }
+ }
+ public void afterRollback() throws Exception {
+ if( debug )
+ log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+ synchronized (RapidTopicMessageStore.this) {
+ inFlightTxLocations.remove(location);
+ }
+ }
+ });
+ }
+
+ }
+
+ public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+ try {
+ synchronized(this) {
+ String subcriberId=getSubscriptionKey(clientId,subscritionName);
+ String id=messageId.toString();
+ ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+ if(container!=null){
+ //container.remove(id);
+ container.removeFirst();
+ AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ if(count!=null){
+ if(count.decrementAndGet()>0){
+ ackContainer.put(id,count);
+ } else {
+ // no more references to message messageContainer so remove it
+ messageContainer.remove(messageId.toString());
+ }
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
+ }
+ }
+
+
+ /**
+ * @param messageId
+ * @param location
+ * @param key
+ */
+ private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
+ synchronized(this) {
+ lastLocation = location;
+ ackedLastAckLocations.put(key, messageId);
+
+ String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
+ String id=messageId.toString();
+ ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+ if(container!=null){
+ //container.remove(id);
+ container.removeFirst();
+ AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ if(count!=null){
+ if(count.decrementAndGet()>0){
+ ackContainer.put(id,count);
+ } else {
+ // no more references to message messageContainer so remove it
+ messageContainer.remove(messageId.toString());
+ }
+ }
+ }
+ }
+ }
+
+ protected String getSubscriptionKey(String clientId,String subscriberName){
+ String result=clientId+":";
+ result+=subscriberName!=null?subscriberName:"NOT_SET";
+ return result;
+ }
+
+
+ public RecordLocation checkpoint() throws IOException {
+
+ ArrayList cpAckedLastAckLocations;
+
+ // swap out the hash maps..
+ synchronized (this) {
+ cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
+ this.ackedLastAckLocations = new HashMap();
+ }
+
+ RecordLocation rc = super.checkpoint();
+ if(!cpAckedLastAckLocations.isEmpty()) {
+ Collections.sort(cpAckedLastAckLocations);
+ RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
+ if( rc == null || t.compareTo(rc)<0 ) {
+ rc = t;
+ }
+ }
+
+ return rc;
+ }
+
+
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ subscriberContainer.remove(key);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ for(Iterator i=list.iterator();i.hasNext();){
+ String id=i.next().toString();
+ AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ if(count!=null){
+ if(count.decrementAndGet()>0){
+ ackContainer.put(id,count);
+ }else{
+ // no more references to message messageContainer so remove it
+ messageContainer.remove(id);
+ }
+ }
+ }
+ }
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return (SubscriptionInfo[]) subscriberContainer.values().toArray(
+ new SubscriptionInfo[subscriberContainer.size()]);
+ }
+
+ protected void addSubscriberAckContainer(Object key) throws IOException{
+ ListContainer container=store.getListContainer(key,"topic-subs");
+ Marshaller marshaller=new StringMarshaller();
+ container.setMarshaller(marshaller);
+ subscriberAcks.put(key,container);
+ }
+
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?rev=405807&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java Fri May 12 10:52:51 2006
@@ -0,0 +1,303 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.rapid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class RapidTransactionStore implements TransactionStore {
+
+ private final RapidPersistenceAdapter peristenceAdapter;
+ ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
+ ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
+ private boolean doingRecover;
+
+
+ public static class TxOperation {
+
+ static final byte ADD_OPERATION_TYPE = 0;
+ static final byte REMOVE_OPERATION_TYPE = 1;
+ static final byte ACK_OPERATION_TYPE = 3;
+
+ public byte operationType;
+ public RapidMessageStore store;
+ public Object data;
+ public RecordLocation location;
+
+ public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) {
+ this.operationType=operationType;
+ this.store=store;
+ this.data=data;
+ this.location = location;
+ }
+
+ }
+ /**
+ * Operations
+ * @version $Revision: 1.6 $
+ */
+ public static class Tx {
+
+ private final RecordLocation location;
+ private ArrayList operations = new ArrayList();
+
+ public Tx(RecordLocation location) {
+ this.location=location;
+ }
+
+ public void add(RapidMessageStore store, Message msg, RecordLocation loc) {
+ operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
+ }
+
+ public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) {
+ operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc));
+ }
+
+ public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) {
+ operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
+ }
+
+ public Message[] getMessages() {
+ ArrayList list = new ArrayList();
+ for (Iterator iter = operations.iterator(); iter.hasNext();) {
+ TxOperation op = (TxOperation) iter.next();
+ if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+ list.add(op.data);
+ }
+ }
+ Message rc[] = new Message[list.size()];
+ list.toArray(rc);
+ return rc;
+ }
+
+ public MessageAck[] getAcks() {
+ ArrayList list = new ArrayList();
+ for (Iterator iter = operations.iterator(); iter.hasNext();) {
+ TxOperation op = (TxOperation) iter.next();
+ if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+ list.add(op.data);
+ }
+ }
+ MessageAck rc[] = new MessageAck[list.size()];
+ list.toArray(rc);
+ return rc;
+ }
+
+ public ArrayList getOperations() {
+ return operations;
+ }
+
+ }
+
+ public RapidTransactionStore(RapidPersistenceAdapter adapter) {
+ this.peristenceAdapter = adapter;
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+ */
+ public void prepare(TransactionId txid) throws IOException {
+ Tx tx = (Tx) inflightTransactions.remove(txid);
+ if (tx == null)
+ return;
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
+ preparedTransactions.put(txid, tx);
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+ */
+ public void replayPrepare(TransactionId txid) throws IOException {
+ Tx tx = (Tx) inflightTransactions.remove(txid);
+ if (tx == null)
+ return;
+ preparedTransactions.put(txid, tx);
+ }
+
+ public Tx getTx(Object txid, RecordLocation location) {
+ Tx tx = (Tx) inflightTransactions.get(txid);
+ if (tx == null) {
+ tx = new Tx(location);
+ inflightTransactions.put(txid, tx);
+ }
+ return tx;
+ }
+
+ /**
+ * @throws XAException
+ * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+ */
+ public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ Tx tx;
+ if (wasPrepared) {
+ tx = (Tx) preparedTransactions.remove(txid);
+ } else {
+ tx = (Tx) inflightTransactions.remove(txid);
+ }
+
+ if (tx == null)
+ return;
+
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
+ true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
+ true);
+ }
+ }
+
+ /**
+ * @throws XAException
+ * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+ */
+ public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+ if (wasPrepared) {
+ return (Tx) preparedTransactions.remove(txid);
+ } else {
+ return (Tx) inflightTransactions.remove(txid);
+ }
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+ */
+ public void rollback(TransactionId txid) throws IOException {
+
+ Tx tx = (Tx) inflightTransactions.remove(txid);
+ if (tx != null)
+ tx = (Tx) preparedTransactions.remove(txid);
+
+ if (tx != null) {
+ if (txid.isXATransaction()) {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
+ true);
+ } else {
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
+ true);
+ }
+ }
+
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+ */
+ public void replayRollback(TransactionId txid) throws IOException {
+ if (inflightTransactions.remove(txid) != null)
+ preparedTransactions.remove(txid);
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+ // All the in-flight transactions get rolled back..
+ inflightTransactions.clear();
+ this.doingRecover = true;
+ try {
+ for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
+ Object txid = (Object) iter.next();
+ Tx tx = (Tx) preparedTransactions.get(txid);
+ listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
+ }
+ } finally {
+ this.doingRecover = false;
+ }
+ }
+
+ /**
+ * @param message
+ * @throws IOException
+ */
+ void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException {
+ Tx tx = getTx(message.getTransactionId(), location);
+ tx.add(store, message, location);
+ }
+
+ /**
+ * @param ack
+ * @throws IOException
+ */
+ public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
+ Tx tx = getTx(ack.getTransactionId(), location);
+ tx.add(store, ack, location);
+ }
+
+
+ public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
+ Tx tx = getTx(ack.getTransactionId(), location);
+ tx.add(store, ack, location);
+ }
+
+
+ public RecordLocation checkpoint() throws IOException {
+
+ // Nothing really to checkpoint.. since, we don't
+ // checkpoint tx operations in to long term store until they are committed.
+
+ // But we keep track of the first location of an operation
+ // that was associated with an active tx. The journal can not
+ // roll over active tx records.
+ RecordLocation rc = null;
+ for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+ Tx tx = (Tx) iter.next();
+ RecordLocation location = tx.location;
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
+ }
+ }
+ for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+ Tx tx = (Tx) iter.next();
+ RecordLocation location = tx.location;
+ if (rc == null || rc.compareTo(location) < 0) {
+ rc = location;
+ }
+ }
+ return rc;
+ }
+
+ public boolean isDoingRecover() {
+ return doingRecover;
+ }
+
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
------------------------------------------------------------------------------
svn:executable = *