You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 02:48:22 UTC
svn commit: r492380 [1/2] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/impl/async/
main/java/org/apache/activemq/store/
main/java/org/apache/activemq/store/kahadaptor/
main/java/org/apache/activemq/store/quick/ t...
Author: chirino
Date: Wed Jan 3 17:48:20 2007
New Revision: 492380
URL: http://svn.apache.org/viewvc?view=rev&rev=492380
Log:
- Big refactor of the QuickJournal:
- Move it to it's own package org.apache.activemq.store.quick
- Brought in all the latest JournalPersistenceAdaptor enhancements
- It now uses the AsyncDataManager as the Journal implemenation which has better read performance
- Instead of forcing all PersistenceAdaptors to support external references, we now move all the message reference methods to a new set of interface class (MesageReferenceAdaptor)
- Enhanced a few Kaha container classes so that they take advantage of Generics
- Added a Kaha based MesageReferenceAdaptor impementation
- Strategy for deleting old journal log files is now in place so that disk space can be reclaimed.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Jan 3 17:48:20 2007
@@ -37,6 +37,7 @@
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.ByteSequence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,12 +90,14 @@
protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+ private Runnable cleanupTask;
+
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if( started ) {
return;
-
}
+
started=true;
directory.mkdirs();
@@ -158,6 +161,12 @@
}
storeState(false);
+
+ cleanupTask = new Runnable(){
+ public void run() {
+ cleanup();
+ }};
+ Scheduler.executePeriodically(cleanupTask, 1000*30);
}
private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
@@ -257,14 +266,24 @@
}
public synchronized void close() throws IOException{
+ if( !started ) {
+ return;
+ }
+ Scheduler.cancel(cleanupTask);
accessorPool.close();
storeState(false);
appender.close();
fileMap.clear();
controlFile.unlock();
controlFile.dispose();
+ started=false;
}
+ private synchronized void cleanup() {
+ if( accessorPool!=null ) {
+ accessorPool.disposeUnused();
+ }
+ }
public synchronized boolean delete() throws IOException{
// Close all open file handles...
@@ -362,6 +381,7 @@
private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getDataFileId());
dataFile.unlink();
+ accessorPool.disposeDataFileAccessors(dataFile);
boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Wed Jan 3 17:48:20 2007
@@ -102,6 +102,21 @@
}
}
+ synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException {
+ if( closed ) {
+ throw new IOException("Closed.");
+ }
+ Pool pool = pools.get(dataFile.getDataFileId());
+ if( pool != null ) {
+ if( !pool.isUsed() ) {
+ pool.dispose();
+ pools.remove(dataFile.getDataFileId());
+ } else {
+ throw new IOException("The data file is still in use: "+dataFile);
+ }
+ }
+ }
+
synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
if( closed ) {
throw new IOException("Closed.");
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Represents a message store which is used by the persistent
+ * implementations
+ *
+ * @version $Revision: 1.5 $
+ */
+public interface ReferenceStore extends MessageStore {
+
+ public class ReferenceData {
+ long expiration;
+ int fileId;
+ int offset;
+
+ public long getExpiration() {
+ return expiration;
+ }
+ public void setExpiration(long expiration) {
+ this.expiration = expiration;
+ }
+ public int getFileId() {
+ return fileId;
+ }
+ public void setFileId(int file) {
+ this.fileId = file;
+ }
+ public int getOffset() {
+ return offset;
+ }
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+ }
+
+ /**
+ * Adds a message reference to the message store
+ */
+ public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
+
+ /**
+ * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+ * in the missing key if its easy to do so.
+ */
+ public ReferenceData getMessageReference(MessageId identity) throws IOException;
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * Adapter to the actual persistence mechanism used with ActiveMQ
+ *
+ * @version $Revision: 1.3 $
+ */
+public interface ReferenceStoreAdapter extends PersistenceAdapter {
+
+ /**
+ * Factory method to create a new queue message store with the given destination name
+ */
+ public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
+
+ /**
+ * Factory method to create a new topic message store with the given destination name
+ */
+ public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
+
+ public Set<Integer> getReferenceFileIdsInUse() throws IOException;
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+
+/**
+ * A MessageStore for durable topic subscriptions
+ *
+ * @version $Revision: 1.4 $
+ */
+public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
+ /**
+ * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
+ * messages from the last checkpoint
+ *
+ * @param context
+ * @param clientId
+ * @param subscriptionName
+ * @param messageId
+ * @param subscriptionPersistentId
+ * @throws IOException
+ */
+ public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+ throws IOException;
+
+ /**
+ * @param clientId
+ * @param subscriptionName
+ * @param sub
+ * @throws IOException
+ * @throws JMSException
+ */
+ public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
+
+ /**
+ * For the new subscription find the last acknowledged message ID and then find any new messages since then and
+ * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
+ * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param listener
+ * @param subscription
+ *
+ * @throws Exception
+ */
+ public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+ throws Exception;
+
+ /**
+ * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
+ * messageId <p/>
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param maxReturned
+ * @param listener
+ *
+ * @throws Exception
+ */
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+ MessageRecoveryListener listener) throws Exception;
+
+ /**
+ * A hint to the Store to reset any batching state for a durable subsriber
+ * @param clientId
+ * @param subscriptionName
+ *
+ */
+ public void resetBatching(String clientId,String subscriptionName);
+
+
+ /**
+ * Get the number of messages ready to deliver from the store to a durable subscriber
+ * @param clientId
+ * @param subscriberName
+ * @return the outstanding message count
+ * @throws IOException
+ */
+ public int getMessageCount(String clientId,String subscriberName) throws IOException;
+
+ /**
+ * Finds the subscriber entry for the given consumer info
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @return the SubscriptionInfo
+ * @throws IOException
+ */
+ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
+
+ /**
+ * Lists all the durable subscirptions for a given destination.
+ *
+ * @return an array SubscriptionInfos
+ * @throws IOException
+ */
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+
+ /**
+ * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
+ * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
+ * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
+ * message so that on recovery, all message recorded for the topic get replayed.
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param selector
+ * @param retroactive
+ * @throws IOException
+ *
+ */
+ public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ throws IOException;
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+
+
+/**
+ * Marshall an Integer
+ * @version $Revision: 1.10 $
+ */
+public class IntegerMarshaller implements Marshaller<Integer> {
+
+ public void writePayload(Integer object,DataOutput dataOut) throws IOException{
+ dataOut.writeInt(object.intValue());
+ }
+
+ public Integer readPayload(DataInput dataIn) throws IOException{
+ return dataIn.readInt();
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Jan 3 17:48:20 2007
@@ -114,8 +114,8 @@
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
- Message msg=(Message)messageContainer.get(entry);
- if(msg.getMessageId().equals(msgId)){
+ MessageId id=getMessageId(messageContainer.get(entry));
+ if(id.equals(msgId)){
messageContainer.remove(entry);
break;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Jan 3 17:48:20 2007
@@ -176,7 +176,7 @@
Store store=getStore();
MapContainer<String, Object> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
- container.setValueMarshaller(createMessageMarshaller());
+ container.setValueMarshaller(new CommandMarshaller(wireFormat));
container.load();
return container;
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
+
+public class KahaReferenceStore extends KahaMessageStore implements ReferenceStore {
+
+ private final MapContainer<Integer, Integer> fileReferences;
+
+ public KahaReferenceStore(ListContainer container, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
+ super(container, destination, maximumCacheSize);
+ this.fileReferences = fileReferences;
+ }
+
+ @Override
+ protected MessageId getMessageId(Object object) {
+ return new MessageId(((ReferenceRecord)object).messageId);
+ }
+
+ @Override
+ public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ @Override
+ public synchronized Message getMessage(MessageId identity) throws IOException {
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ @Override
+ protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
+ ReferenceRecord record = (ReferenceRecord) msg;
+ listener.recoverMessageReference(new MessageId(record.messageId));
+ }
+
+ public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
+ ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+ StoreEntry item=messageContainer.placeLast(record);
+ cache.put(messageId,item);
+ }
+
+ public ReferenceData getMessageReference(MessageId identity) throws IOException {
+
+ ReferenceRecord result=null;
+ StoreEntry entry=cache.get(identity);
+ if(entry!=null){
+ entry = messageContainer.refresh(entry);
+ result = (ReferenceRecord)messageContainer.get(entry);
+ }else{
+ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+ ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+ if(msg.messageId.equals(identity)){
+ result=msg;
+ cache.put(identity,entry);
+ break;
+ }
+ }
+ }
+ if( result == null )
+ return null;
+ return result.data;
+ }
+
+ public void addReferenceFileIdsInUse(Set<Integer> rc) {
+ for (StoreEntry entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+ ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+ rc.add(msg.data.getFileId());
+ }
+ }
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,153 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
+
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
+
+ private MapContainer<Integer, Integer> fileReferences;
+
+ public KahaReferenceStoreAdapter(File dir) throws IOException {
+ super(dir);
+ }
+
+ public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+ throw new RuntimeException("Use createQueueReferenceStore instead");
+ }
+
+ public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
+ throw new RuntimeException("Use createTopicReferenceStore instead");
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+
+ Store store=getStore();
+ fileReferences=store.getMapContainer("file-references");
+ fileReferences.setKeyMarshaller(new IntegerMarshaller());
+ fileReferences.setValueMarshaller(new IntegerMarshaller());
+ fileReferences.load();
+ }
+
+ public static class ReferenceRecord {
+
+ public String messageId;
+ public ReferenceData data;
+
+ public ReferenceRecord() {
+ }
+ public ReferenceRecord(String messageId, ReferenceData data) {
+ this.messageId = messageId;
+ this.data = data;
+ }
+ }
+
+ protected Marshaller<Object> createMessageMarshaller() {
+ return new Marshaller<Object>() {
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
+ ReferenceRecord rr = (ReferenceRecord) object;
+ dataOut.writeUTF(rr.messageId);
+ dataOut.writeInt(rr.data.getFileId());
+ dataOut.writeInt(rr.data.getOffset());
+ dataOut.writeLong(rr.data.getExpiration());
+ }
+ public Object readPayload(DataInput dataIn) throws IOException{
+ ReferenceRecord rr = new ReferenceRecord();
+ rr.messageId = dataIn.readUTF();
+ rr.data = new ReferenceData();
+ rr.data.setFileId(dataIn.readInt());
+ rr.data.setOffset(dataIn.readInt());
+ rr.data.setExpiration(dataIn.readLong());
+ return rr;
+ }
+ };
+ }
+
+ public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
+ ReferenceStore rc=(ReferenceStore)queues.get(destination);
+ if(rc==null){
+ rc=new KahaReferenceStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize, fileReferences);
+ messageStores.put(destination,rc);
+// if(transactionStore!=null){
+// rc=transactionStore.proxy(rc);
+// }
+ queues.put(destination,rc);
+ }
+ return rc;
+ }
+
+ public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
+ TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
+ if(rc==null){
+ Store store=getStore();
+ ListContainer messageContainer=getListContainer(destination,"topic-data");
+ MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+ ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+ ackContainer.setMarshaller(new TopicSubAckMarshaller());
+ rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize, fileReferences);
+ messageStores.put(destination,rc);
+// if(transactionStore!=null){
+// rc=transactionStore.proxy(rc);
+// }
+ topics.put(destination,rc);
+ }
+ return rc;
+ }
+
+ public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+
+ Set<Integer> rc = new HashSet<Integer>();
+
+ Set<ActiveMQDestination> destinations = getDestinations();
+ for (ActiveMQDestination destination : destinations) {
+ if( destination.isQueue() ) {
+ KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
+ store.addReferenceFileIdsInUse(rc);
+ } else {
+ KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
+ store.addReferenceFileIdsInUse(rc);
+ }
+ }
+
+ return rc;
+
+ }
+
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Wed Jan 3 17:48:20 2007
@@ -30,6 +30,7 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
/**
* @version $Revision: 1.5 $
@@ -54,6 +55,7 @@
}
}
+ @Override
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
+
+public class KahaTopicReferenceStore extends KahaTopicMessageStore implements TopicReferenceStore {
+
+ private final MapContainer<Integer, Integer> fileReferences;
+
+ public KahaTopicReferenceStore(Store store, ListContainer messageContainer, ListContainer ackContainer, MapContainer subsContainer, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
+ super(store, messageContainer, ackContainer, subsContainer, destination, maximumCacheSize);
+ this.fileReferences = fileReferences;
+ }
+
+ @Override
+ protected MessageId getMessageId(Object object) {
+ return new MessageId(((ReferenceRecord)object).messageId);
+ }
+
+ @Override
+ public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ @Override
+ public synchronized Message getMessage(MessageId identity) throws IOException {
+ throw new RuntimeException("Use addMessageReference instead");
+ }
+
+ @Override
+ protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
+ ReferenceRecord record = (ReferenceRecord) msg;
+ listener.recoverMessageReference(new MessageId(record.messageId));
+ }
+
+ public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
+
+ ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+
+ int subscriberCount=subscriberMessages.size();
+ if(subscriberCount>0){
+ StoreEntry messageEntry=messageContainer.placeLast(record);
+ TopicSubAck tsa=new TopicSubAck();
+ tsa.setCount(subscriberCount);
+ tsa.setMessageEntry(messageEntry);
+ StoreEntry ackEntry=ackContainer.placeLast(tsa);
+ for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+ TopicSubContainer container=(TopicSubContainer)i.next();
+ ConsumerMessageRef ref=new ConsumerMessageRef();
+ ref.setAckEntry(ackEntry);
+ ref.setMessageEntry(messageEntry);
+ container.add(ref);
+ }
+ }
+
+ }
+
+ public ReferenceData getMessageReference(MessageId identity) throws IOException {
+
+ ReferenceRecord result=null;
+ StoreEntry entry=(StoreEntry)cache.get(identity);
+ if(entry!=null){
+ entry = messageContainer.refresh(entry);
+ result = (ReferenceRecord)messageContainer.get(entry);
+ }else{
+ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+ ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
+ if(msg.messageId.equals(identity)){
+ result=msg;
+ cache.put(identity,entry);
+ break;
+ }
+ }
+ }
+ if( result == null )
+ return null;
+ return result.data;
+ }
+
+ public void addReferenceFileIdsInUse(Set<Integer> rc) {
+ for (StoreEntry entry = ackContainer.getFirst();entry != null; entry = ackContainer.getNext(entry)) {
+ TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
+ if( subAck.getCount() > 0 ) {
+ ReferenceRecord rr = (ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
+ rc.add(rr.data.getFileId());
+ }
+ }
+ }
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.TransactionTemplate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ *
+ * @version $Revision: 1.14 $
+ */
+public class QuickMessageStore implements MessageStore {
+
+ private static final Log log = LogFactory.getLog(QuickMessageStore.class);
+
+ protected final QuickPersistenceAdapter peristenceAdapter;
+ protected final QuickTransactionStore transactionStore;
+ protected final ReferenceStore referenceStore;
+ protected final ActiveMQDestination destination;
+ protected final TransactionTemplate transactionTemplate;
+
+ private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
+ private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
+
+ /** A MessageStore that we can use to retrieve messages quickly. */
+ private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
+
+ protected Location lastLocation;
+ protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
+
+ public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+ this.peristenceAdapter = adapter;
+ this.transactionStore = adapter.getTransactionStore();
+ this.referenceStore = referenceStore;
+ this.destination = destination;
+ this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+ }
+
+ public void setUsageManager(UsageManager usageManager) {
+ referenceStore.setUsageManager(usageManager);
+ }
+
+
+ /**
+ * Not synchronized since the Journal has better throughput if you increase
+ * the number of concurrent writes that it is doing.
+ */
+ public void addMessage(ConnectionContext context, final Message message) throws IOException {
+
+ final MessageId id = message.getMessageId();
+
+ final boolean debug = log.isDebugEnabled();
+
+ final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+ if( !context.isInTransaction() ) {
+ if( debug )
+ log.debug("Journalled message add for: "+id+", at: "+location);
+ addMessage(message, location);
+ } else {
+ if( debug )
+ log.debug("Journalled transacted message add for: "+id+", at: "+location);
+ synchronized( this ) {
+ inFlightTxLocations.add(location);
+ }
+ transactionStore.addMessage(this, message, location);
+ context.getTransaction().addSynchronization(new Synchronization(){
+ public void afterCommit() throws Exception {
+ if( debug )
+ log.debug("Transacted message add commit for: "+id+", at: "+location);
+ synchronized( QuickMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ addMessage(message, location);
+ }
+ }
+ public void afterRollback() throws Exception {
+ if( debug )
+ log.debug("Transacted message add rollback for: "+id+", at: "+location);
+ synchronized( QuickMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ }
+ }
+ });
+ }
+ }
+
+ private void addMessage(final Message message, final Location location) {
+ ReferenceData data = new ReferenceData();
+ data.setExpiration(message.getExpiration());
+ data.setFileId(location.getDataFileId());
+ data.setOffset(location.getOffset());
+ synchronized (this) {
+ lastLocation = location;
+ messages.put(message.getMessageId(), data);
+ }
+ }
+
+ public void replayAddMessage(ConnectionContext context, Message message, Location location) {
+ MessageId id = message.getMessageId();
+ try {
+ // Only add the message if it has not already been added.
+ ReferenceData data = referenceStore.getMessageReference(id);
+ if( data==null ) {
+ data = new ReferenceData();
+ data.setExpiration(message.getExpiration());
+ data.setFileId(location.getDataFileId());
+ data.setOffset(location.getOffset());
+ referenceStore.addMessageReference(context, id, data);
+ }
+ }
+ catch (Throwable e) {
+ log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e);
+ }
+ }
+
+ /**
+ */
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ final boolean debug = log.isDebugEnabled();
+ JournalQueueAck remove = new JournalQueueAck();
+ remove.setDestination(destination);
+ remove.setMessageAck(ack);
+
+ final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+ if( !context.isInTransaction() ) {
+ if( debug )
+ log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+ removeMessage(ack, location);
+ } else {
+ if( debug )
+ log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( this ) {
+ inFlightTxLocations.add(location);
+ }
+ transactionStore.removeMessage(this, ack, location);
+ context.getTransaction().addSynchronization(new Synchronization(){
+ public void afterCommit() throws Exception {
+ if( debug )
+ log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( QuickMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ removeMessage(ack, location);
+ }
+ }
+ public void afterRollback() throws Exception {
+ if( debug )
+ log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
+ synchronized( QuickMessageStore.this ) {
+ inFlightTxLocations.remove(location);
+ }
+ }
+ });
+
+ }
+ }
+
+ private void removeMessage(final MessageAck ack, final Location location) {
+ synchronized (this) {
+ lastLocation = location;
+ MessageId id = ack.getLastMessageId();
+ ReferenceData data = messages.remove(id);
+ if (data == null) {
+ messageAcks.add(ack);
+ }
+ }
+ }
+
+ public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+ try {
+ // Only remove the message if it has not already been removed.
+ ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
+ if( t!=null ) {
+ referenceStore.removeMessage(context, messageAck);
+ }
+ }
+ catch (Throwable e) {
+ log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
+ }
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ */
+ public Location checkpoint() throws IOException {
+ return checkpoint(null);
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ */
+ public Location checkpoint(final Callback postCheckpointTest) throws IOException {
+
+ final ArrayList<MessageAck> cpRemovedMessageLocations;
+ final ArrayList<Location> cpActiveJournalLocations;
+ final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
+
+ // swap out the message hash maps..
+ synchronized (this) {
+ cpAddedMessageIds = this.messages;
+ cpRemovedMessageLocations = this.messageAcks;
+ cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
+ this.messages = new LinkedHashMap<MessageId, ReferenceData>();
+ this.messageAcks = new ArrayList<MessageAck>();
+ }
+
+ transactionTemplate.run(new Callback() {
+ public void execute() throws Exception {
+
+ int size = 0;
+
+ PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
+ ConnectionContext context = transactionTemplate.getContext();
+
+ // Checkpoint the added messages.
+ Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<MessageId, ReferenceData> entry = iterator.next();
+ try {
+ referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
+ } catch (Throwable e) {
+ log.warn("Message could not be added to long term store: " + e.getMessage(), e);
+ }
+
+ size ++;
+
+ // Commit the batch if it's getting too big
+ if( size >= maxCheckpointMessageAddSize ) {
+ persitanceAdapter.commitTransaction(context);
+ persitanceAdapter.beginTransaction(context);
+ size=0;
+ }
+ }
+
+ persitanceAdapter.commitTransaction(context);
+ persitanceAdapter.beginTransaction(context);
+
+ // Checkpoint the removed messages.
+ for (MessageAck ack : cpRemovedMessageLocations) {
+ try {
+ referenceStore.removeMessage(transactionTemplate.getContext(), ack);
+ } catch (Throwable e) {
+ log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
+ }
+ }
+
+ if( postCheckpointTest!= null ) {
+ postCheckpointTest.execute();
+ }
+ }
+
+ });
+
+ synchronized (this) {
+ cpAddedMessageIds = null;
+ }
+
+ if( cpActiveJournalLocations.size() > 0 ) {
+ Collections.sort(cpActiveJournalLocations);
+ return cpActiveJournalLocations.get(0);
+ } else {
+ return lastLocation;
+ }
+ }
+
+ /**
+ *
+ */
+ public Message getMessage(MessageId identity) throws IOException {
+
+ ReferenceData data=null;
+
+ synchronized (this) {
+ // Is it still in flight???
+ data = messages.get(identity);
+ if( data==null && cpAddedMessageIds!=null ) {
+ data = cpAddedMessageIds.get(identity);
+ }
+ }
+
+ if( data==null ) {
+ data = referenceStore.getMessageReference(identity);
+ }
+
+ if( data==null ) {
+ return null;
+ }
+
+ Message answer = null;
+ if (answer != null ) {
+ return answer;
+ }
+
+ Location location = new Location();
+ location.setDataFileId(data.getFileId());
+ location.setOffset(data.getOffset());
+ return (Message) peristenceAdapter.readCommand(location);
+ }
+
+ /**
+ * Replays the checkpointStore first as those messages are the oldest ones,
+ * then messages are replayed from the transaction log and then the cache is
+ * updated.
+ *
+ * @param listener
+ * @throws Exception
+ */
+ public void recover(final MessageRecoveryListener listener) throws Exception {
+ peristenceAdapter.checkpoint(true);
+ referenceStore.recover(new RecoveryListenerAdapter(this, listener));
+ }
+
+ public void start() throws Exception {
+ referenceStore.start();
+ }
+
+ public void stop() throws Exception {
+ referenceStore.stop();
+ }
+
+ /**
+ * @return Returns the longTermStore.
+ */
+ public ReferenceStore getReferenceStore() {
+ return referenceStore;
+ }
+
+ /**
+ * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
+ */
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ peristenceAdapter.checkpoint(true);
+ referenceStore.removeAllMessages(context);
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+ throw new IOException("The journal does not support message references.");
+ }
+
+ public String getMessageReference(MessageId identity) throws IOException {
+ throw new IOException("The journal does not support message references.");
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ * @see org.apache.activemq.store.MessageStore#getMessageCount()
+ */
+ public int getMessageCount() throws IOException{
+ peristenceAdapter.checkpoint(true);
+ return referenceStore.getMessageCount();
+ }
+
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ peristenceAdapter.checkpoint(true);
+ referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener));
+
+ }
+
+
+ public void resetBatching(){
+ referenceStore.resetBatching();
+
+ }
+
+}
\ No newline at end of file
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Wed Jan 3 17:48:20 2007
@@ -0,0 +1,699 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
+import org.apache.activemq.store.quick.QuickTransactionStore.Tx;
+import org.apache.activemq.store.quick.QuickTransactionStore.TxOperation;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.17 $
+ */
+public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListener {
+
+ private static final Log log = LogFactory.getLog(QuickPersistenceAdapter.class);
+
+ private final ConcurrentHashMap<ActiveMQQueue, QuickMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, QuickMessageStore>();
+ private final ConcurrentHashMap<ActiveMQTopic, QuickMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, QuickMessageStore>();
+
+ private AsyncDataManager asyncDataManager;
+ private ReferenceStoreAdapter referenceStoreAdapter;
+ private TaskRunnerFactory taskRunnerFactory;
+ private WireFormat wireFormat = new OpenWireFormat();
+
+ private UsageManager usageManager;
+ private long checkpointInterval = 1000 * 30;
+ private int maxCheckpointWorkers = 1;
+ private int maxCheckpointMessageAddSize = 1024*4;
+
+ private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
+ private ThreadPoolExecutor checkpointExecutor;
+
+ private TaskRunner checkpointTask;
+ private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private Runnable periodicCheckpointTask;
+
+ private Runnable periodicCleanupTask;
+ private boolean deleteAllMessages;
+ private File directory = new File("activemq-data/quick");
+
+
+ public synchronized void start() throws Exception {
+ if( !started.compareAndSet(false, true) )
+ return;
+ this.usageManager.addUsageListener(this);
+
+ if( asyncDataManager == null ) {
+ asyncDataManager = createAsyncDataManager();
+ }
+
+ if( referenceStoreAdapter==null ) {
+ referenceStoreAdapter = createReferenceStoreAdapter();
+ }
+ referenceStoreAdapter.setUsageManager(usageManager);
+
+ if( taskRunnerFactory==null ) {
+ taskRunnerFactory = createTaskRunnerFactory();
+ }
+
+ asyncDataManager.start();
+ if( deleteAllMessages ) {
+ asyncDataManager.delete();
+ try {
+ JournalTrace trace = new JournalTrace();
+ trace.setMessage("DELETED "+new Date());
+ Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
+ asyncDataManager.setMark(location, true);
+ log.info("Journal deleted: ");
+ deleteAllMessages=false;
+ } catch (IOException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(e);
+ }
+
+ referenceStoreAdapter.deleteAllMessages();
+ }
+ referenceStoreAdapter.start();
+
+ Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
+ for (Integer fileId : files) {
+ asyncDataManager.addInterestInFile(fileId);
+ }
+
+ checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+ public boolean iterate() {
+ doCheckpoint();
+ return false;
+ }
+ }, "ActiveMQ Journal Checkpoint Worker");
+
+ checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, "Journal checkpoint worker");
+ t.setPriority(7);
+ return t;
+ }
+ });
+
+ createTransactionStore();
+ recover();
+
+ // Do a checkpoint periodically.
+ periodicCheckpointTask = new Runnable() {
+ public void run() {
+ checkpoint(false);
+ }
+ };
+
+ Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+
+ periodicCleanupTask = new Runnable() {
+ public void run() {
+ cleanup();
+ }
+ };
+ Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval);
+
+ }
+
+
+ public void stop() throws Exception {
+
+ if( !started.compareAndSet(true, false) )
+ return;
+
+ this.usageManager.removeUsageListener(this);
+ Scheduler.cancel(periodicCheckpointTask);
+
+ // Take one final checkpoint and stop checkpoint processing.
+ checkpoint(true);
+ checkpointTask.shutdown();
+ log.debug("Checkpoint task shutdown");
+ checkpointExecutor.shutdown();
+
+ queues.clear();
+ topics.clear();
+
+ IOException firstException = null;
+ referenceStoreAdapter.stop();
+ try {
+ log.debug("Journal close");
+ asyncDataManager.close();
+ } catch (Exception e) {
+ firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+ }
+
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+
+ /**
+ * When we checkpoint we move all the journalled data to long term storage.
+ * @param stopping
+ *
+ * @param b
+ */
+ public void checkpoint(boolean sync) {
+ try {
+ if (asyncDataManager == null )
+ throw new IllegalStateException("Journal is closed.");
+
+ CountDownLatch latch = null;
+ synchronized(this) {
+ latch = nextCheckpointCountDownLatch;
+ }
+
+ checkpointTask.wakeup();
+
+ if (sync) {
+ log.debug("Waitng for checkpoint to complete.");
+ latch.await();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Request to start checkpoint failed: " + e, e);
+ }
+ }
+
+ /**
+ * This does the actual checkpoint.
+ * @return
+ */
+ public boolean doCheckpoint() {
+ CountDownLatch latch = null;
+ synchronized(this) {
+ latch = nextCheckpointCountDownLatch;
+ nextCheckpointCountDownLatch = new CountDownLatch(1);
+ }
+ try {
+
+ log.debug("Checkpoint started.");
+ Location newMark = null;
+
+ ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
+
+ //
+ Iterator<QuickMessageStore> iterator = queues.values().iterator();
+ while (iterator.hasNext()) {
+ try {
+ final QuickMessageStore ms = iterator.next();
+ FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
+ public Location call() throws Exception {
+ return ms.checkpoint();
+ }});
+ futureTasks.add(task);
+ checkpointExecutor.execute(task);
+ }
+ catch (Exception e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+ }
+
+ iterator = topics.values().iterator();
+ while (iterator.hasNext()) {
+ try {
+ final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+ FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
+ public Location call() throws Exception {
+ return ms.checkpoint();
+ }});
+ futureTasks.add(task);
+ checkpointExecutor.execute(task);
+ }
+ catch (Exception e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+ }
+
+ try {
+ for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();) {
+ FutureTask ft = iter.next();
+ Location mark = (Location) ft.get();
+ // We only set a newMark on full checkpoints.
+ if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+ newMark = mark;
+ }
+ }
+ } catch (Throwable e) {
+ log.error("Failed to checkpoint a message store: " + e, e);
+ }
+
+
+ try {
+ if (newMark != null) {
+ log.debug("Marking journal at: " + newMark);
+ asyncDataManager.setMark(newMark, false);
+ writeTraceMessage("CHECKPOINT "+new Date(), true);
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to mark the Journal: " + e, e);
+ }
+
+// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
+// // We may be check pointing more often than the checkpointInterval if under high use
+// // But we don't want to clean up the db that often.
+// long now = System.currentTimeMillis();
+// if( now > lastCleanup+checkpointInterval ) {
+// lastCleanup = now;
+// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
+// }
+// }
+
+ log.debug("Checkpoint done.");
+ }
+ finally {
+ latch.countDown();
+ }
+ return true;
+ }
+
+ /**
+ * Cleans up the data files
+ * @return
+ * @throws IOException
+ */
+ public void cleanup() {
+
+ try {
+
+ Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
+ asyncDataManager.consolidateDataFilesNotIn(inUse);
+
+ } catch (IOException e) {
+ log.error("Could not cleanup data files: "+e, e);
+ }
+
+ }
+
+
+ public Set<ActiveMQDestination> getDestinations() {
+ Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+ destinations.addAll(queues.keySet());
+ destinations.addAll(topics.keySet());
+ return destinations;
+ }
+
+ private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+ if (destination.isQueue()) {
+ return createQueueMessageStore((ActiveMQQueue) destination);
+ }
+ else {
+ return createTopicMessageStore((ActiveMQTopic) destination);
+ }
+ }
+
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ QuickMessageStore store = queues.get(destination);
+ if (store == null) {
+ ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
+ store = new QuickMessageStore(this, checkpointStore, destination);
+ queues.put(destination, store);
+ }
+ return store;
+ }
+
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
+ QuickTopicMessageStore store = (QuickTopicMessageStore) topics.get(destinationName);
+ if (store == null) {
+ TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
+ store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
+ topics.put(destinationName, store);
+ }
+ return store;
+ }
+
+ public TransactionStore createTransactionStore() throws IOException {
+ return transactionStore;
+ }
+
+ public long getLastMessageBrokerSequenceId() throws IOException {
+ return referenceStoreAdapter.getLastMessageBrokerSequenceId();
+ }
+
+ public void beginTransaction(ConnectionContext context) throws IOException {
+ referenceStoreAdapter.beginTransaction(context);
+ }
+
+ public void commitTransaction(ConnectionContext context) throws IOException {
+ referenceStoreAdapter.commitTransaction(context);
+ }
+
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
+ referenceStoreAdapter.rollbackTransaction(context);
+ }
+
+
+ /**
+ * @param location
+ * @return
+ * @throws IOException
+ */
+ public DataStructure readCommand(Location location) throws IOException {
+ try {
+ ByteSequence packet = asyncDataManager.read(location);
+ return (DataStructure) wireFormat.unmarshal(packet);
+ } catch (IOException e) {
+ throw createReadException(location, e);
+ }
+ }
+
+ /**
+ * Move all the messages that were in the journal into long term storage. We
+ * just replay and do a checkpoint.
+ *
+ * @throws IOException
+ * @throws IOException
+ * @throws InvalidLocationException
+ * @throws IllegalStateException
+ */
+ private void recover() throws IllegalStateException, IOException, IOException {
+
+ Location pos = null;
+ int transactionCounter = 0;
+
+ log.info("Journal Recovery Started from: " + asyncDataManager);
+ ConnectionContext context = new ConnectionContext();
+
+ // While we have records in the journal.
+ while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+ ByteSequence data = asyncDataManager.read(pos);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+ if (c instanceof Message ) {
+ Message message = (Message) c;
+ QuickMessageStore store = (QuickMessageStore) createMessageStore(message.getDestination());
+ if ( message.isInTransaction()) {
+ transactionStore.addMessage(store, message, pos);
+ }
+ else {
+ store.replayAddMessage(context, message, pos);
+ transactionCounter++;
+ }
+ } else {
+ switch (c.getDataStructureType()) {
+ case JournalQueueAck.DATA_STRUCTURE_TYPE:
+ {
+ JournalQueueAck command = (JournalQueueAck) c;
+ QuickMessageStore store = (QuickMessageStore) createMessageStore(command.getDestination());
+ if (command.getMessageAck().isInTransaction()) {
+ transactionStore.removeMessage(store, command.getMessageAck(), pos);
+ }
+ else {
+ store.replayRemoveMessage(context, command.getMessageAck());
+ transactionCounter++;
+ }
+ }
+ break;
+ case JournalTopicAck.DATA_STRUCTURE_TYPE:
+ {
+ JournalTopicAck command = (JournalTopicAck) c;
+ QuickTopicMessageStore store = (QuickTopicMessageStore) createMessageStore(command.getDestination());
+ if (command.getTransactionId() != null) {
+ transactionStore.acknowledge(store, command, pos);
+ }
+ else {
+ store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
+ transactionCounter++;
+ }
+ }
+ break;
+ case JournalTransaction.DATA_STRUCTURE_TYPE:
+ {
+ JournalTransaction command = (JournalTransaction) c;
+ try {
+ // Try to replay the packet.
+ switch (command.getType()) {
+ case JournalTransaction.XA_PREPARE:
+ transactionStore.replayPrepare(command.getTransactionId());
+ break;
+ case JournalTransaction.XA_COMMIT:
+ case JournalTransaction.LOCAL_COMMIT:
+ Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+ if (tx == null)
+ break; // We may be trying to replay a commit that
+ // was already committed.
+
+ // Replay the committed operations.
+ tx.getOperations();
+ for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+ TxOperation op = (TxOperation) iter.next();
+ if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
+ op.store.replayAddMessage(context, (Message)op.data, pos);
+ }
+ if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
+ op.store.replayRemoveMessage(context, (MessageAck) op.data);
+ }
+ if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
+ JournalTopicAck ack = (JournalTopicAck) op.data;
+ ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
+ .getMessageId());
+ }
+ }
+ transactionCounter++;
+ break;
+ case JournalTransaction.LOCAL_ROLLBACK:
+ case JournalTransaction.XA_ROLLBACK:
+ transactionStore.replayRollback(command.getTransactionId());
+ break;
+ }
+ }
+ catch (IOException e) {
+ log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+ }
+ }
+ break;
+ case JournalTrace.DATA_STRUCTURE_TYPE:
+ JournalTrace trace = (JournalTrace) c;
+ log.debug("TRACE Entry: " + trace.getMessage());
+ break;
+ default:
+ log.error("Unknown type of record in transaction log which will be discarded: " + c);
+ }
+ }
+ }
+
+ Location location = writeTraceMessage("RECOVERED "+new Date(), true);
+ asyncDataManager.setMark(location, true);
+
+ log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+ }
+
+ private IOException createReadException(Location location, Exception e) {
+ return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+ }
+
+ protected IOException createWriteException(DataStructure packet, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+ }
+
+ protected IOException createWriteException(String command, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+ }
+
+ protected IOException createRecoveryFailedException(Exception e) {
+ return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+ }
+
+ /**
+ *
+ * @param command
+ * @param sync
+ * @return
+ * @throws IOException
+ */
+ public Location writeCommand(DataStructure command, boolean sync) throws IOException {
+ return asyncDataManager.write(wireFormat.marshal(command), sync);
+ }
+
+ private Location writeTraceMessage(String message, boolean sync) throws IOException {
+ JournalTrace trace = new JournalTrace();
+ trace.setMessage(message);
+ return writeCommand(trace, sync);
+ }
+
+ public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ newPercentUsage = ((newPercentUsage)/10)*10;
+ oldPercentUsage = ((oldPercentUsage)/10)*10;
+ if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+ boolean sync = newPercentUsage >= 90;
+ checkpoint(sync);
+ }
+ }
+
+ public QuickTransactionStore getTransactionStore() {
+ return transactionStore;
+ }
+
+ public void deleteAllMessages() throws IOException {
+ deleteAllMessages=true;
+ }
+
+
+
+ public String toString(){
+ return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Subclass overridables
+ ///////////////////////////////////////////////////////////////////
+ protected AsyncDataManager createAsyncDataManager() {
+ AsyncDataManager manager = new AsyncDataManager();
+ manager.setDirectory(new File(directory, "journal"));
+ return manager;
+ }
+
+ protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+ KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
+ return adaptor;
+ }
+
+ protected TaskRunnerFactory createTaskRunnerFactory() {
+ return DefaultThreadPools.getDefaultTaskRunnerFactory();
+ }
+
+
+ ///////////////////////////////////////////////////////////////////
+ // Property Accessors
+ ///////////////////////////////////////////////////////////////////
+
+ public AsyncDataManager getAsyncDataManager() {
+ return asyncDataManager;
+ }
+ public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+ this.asyncDataManager = asyncDataManager;
+ }
+
+ public ReferenceStoreAdapter getReferenceStoreAdapter() {
+ return referenceStoreAdapter;
+ }
+ public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+ this.referenceStoreAdapter = referenceStoreAdapter;
+ }
+
+ public TaskRunnerFactory getTaskRunnerFactory() {
+ return taskRunnerFactory;
+ }
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+ this.taskRunnerFactory = taskRunnerFactory;
+ }
+
+ /**
+ * @return Returns the wireFormat.
+ */
+ public WireFormat getWireFormat() {
+ return wireFormat;
+ }
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
+ }
+
+ public UsageManager getUsageManager() {
+ return usageManager;
+ }
+ public void setUsageManager(UsageManager usageManager) {
+ this.usageManager = usageManager;
+ }
+
+ public int getMaxCheckpointMessageAddSize() {
+ return maxCheckpointMessageAddSize;
+ }
+ public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+ this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+ }
+
+ public int getMaxCheckpointWorkers() {
+ return maxCheckpointWorkers;
+ }
+ public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+ this.maxCheckpointWorkers = maxCheckpointWorkers;
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+}