You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/06 11:25:50 UTC
svn commit: r515054 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./
amq/ jdbc/ journal/ kahadaptor/ memory/ quick/ rapid/
Author: rajdavies
Date: Tue Mar 6 02:25:48 2007
New Revision: 515054
URL: http://svn.apache.org/viewvc?view=rev&rev=515054
Log:
Deleted store implementations rapid and quick, as they are replaced by the AMQStore implementation
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (with props)
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -24,6 +24,7 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
+import java.io.File;
import java.io.IOException;
import java.util.Set;
@@ -38,22 +39,30 @@
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
* objects that the persistence store is aware exist.
*
- * @return
+ * @return active destinations
*/
public Set<ActiveMQDestination> getDestinations();
/**
* Factory method to create a new queue message store with the given destination name
+ * @param destination
+ * @return the message store
+ * @throws IOException
*/
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
/**
* Factory method to create a new topic message store with the given destination name
+ * @param destination
+ * @return the topic message store
+ * @throws IOException
*/
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
/**
* Factory method to create a new persistent prepared transaction store for XA recovery
+ * @return transaction store
+ * @throws IOException
*/
public TransactionStore createTransactionStore() throws IOException;
@@ -66,27 +75,33 @@
* real high performance its usually faster to perform many writes within the same
* transaction to minimize latency caused by disk synchronization. This is especially
* true when using tools like Berkeley Db or embedded JDBC servers.
+ * @param context
+ * @throws IOException
*/
public void beginTransaction(ConnectionContext context) throws IOException;
/**
* Commit a persistence transaction
+ * @param context
+ * @throws IOException
*
- * @see PersistenceAdapter#beginTransaction()
+ * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
public void commitTransaction(ConnectionContext context) throws IOException;
/**
* Rollback a persistence transaction
+ * @param context
+ * @throws IOException
*
- * @see PersistenceAdapter#beginTransaction()
+ * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
public void rollbackTransaction(ConnectionContext context) throws IOException;
/**
*
- * @return
+ * @return last broker sequence
* @throws IOException
*/
public long getLastMessageBrokerSequenceId() throws IOException;
@@ -102,4 +117,24 @@
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
+
+ /**
+ * Set the name of the broker using the adapter
+ * @param brokerName
+ */
+ public void setBrokerName(String brokerName);
+
+ /**
+ * Set the directory where any data files should be created
+ * @param dir
+ */
+ public void setDirectory(File dir);
+
+ /**
+ * checkpoint any
+ * @param sync
+ * @throws IOException
+ *
+ */
+ public void checkpoint(boolean sync) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java Tue Mar 6 02:25:48 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq.store;
+import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
import org.springframework.beans.factory.FactoryBean;
/**
@@ -26,7 +27,7 @@
*
* @version $Revision: 1.1 $
*/
-public class PersistenceAdapterFactoryBean extends DefaultPersistenceAdapterFactory implements FactoryBean {
+public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean {
private PersistenceAdapter persistenceAdaptor;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -1,20 +1,17 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store.amq;
import java.io.File;
@@ -26,7 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -54,7 +50,6 @@
import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
-
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
@@ -68,261 +63,231 @@
import org.apache.commons.logging.LogFactory;
/**
- * An implementation of {@link PersistenceAdapter} designed for use with a
- * {@link Journal} and then check pointing asynchronously on a timeout with some
- * other long term persistent storage.
+ * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
+ * asynchronously on a timeout with some other long term persistent storage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
-public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener {
+public class AMQPersistenceAdapter implements PersistenceAdapter,UsageListener{
- private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
-
- private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
- private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
-
+ private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class);
+ private final ConcurrentHashMap<ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,AMQMessageStore>();
+ private final ConcurrentHashMap<ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,AMQMessageStore>();
private AsyncDataManager asyncDataManager;
- private KahaReferenceStoreAdapter referenceStoreAdapter;
- private TaskRunnerFactory taskRunnerFactory;
- private WireFormat wireFormat = new OpenWireFormat();
-
+ private ReferenceStoreAdapter referenceStoreAdapter;
+ private TaskRunnerFactory taskRunnerFactory;
+ private WireFormat wireFormat=new OpenWireFormat();
private UsageManager usageManager;
-
- private long cleanupInterval = 1000 * 60;
- private long checkpointInterval = 1000 * 10;
-
- private int maxCheckpointWorkers = 1;
- private int maxCheckpointMessageAddSize = 1024*4;
-
- private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
-
+ private long cleanupInterval=1000*60;
+ private long checkpointInterval=1000*10;
+ private int maxCheckpointWorkers=1;
+ private int maxCheckpointMessageAddSize=1024*4;
+ private AMQTransactionStore transactionStore=new AMQTransactionStore(this);
private TaskRunner checkpointTask;
- private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
-
- private final AtomicBoolean started = new AtomicBoolean(false);
+ private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
+ private final AtomicBoolean started=new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
-
- private Runnable periodicCleanupTask;
- private boolean deleteAllMessages;
+ private Runnable periodicCleanupTask;
+ private boolean deleteAllMessages;
private boolean syncOnWrite;
- private String brokerName;
- private File directory;
-
+ private String brokerName="";
+ private File directory;
- public AMQPersistenceAdapter() {
- this("localhost");
+ public String getBrokerName(){
+ return this.brokerName;
+ }
+
+ public void setBrokerName(String brokerName){
+ this.brokerName=brokerName;
+ if(this.referenceStoreAdapter!=null){
+ this.referenceStoreAdapter.setBrokerName(brokerName);
}
- public AMQPersistenceAdapter(String brokerName) {
- this.brokerName = brokerName;
- this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName + "-amqstore");
}
-
- public synchronized void start() throws Exception {
- if( !started.compareAndSet(false, true) )
+ public synchronized void start() throws Exception{
+ if(!started.compareAndSet(false,true))
return;
- if (this.usageManager!=null) {
+ if(this.directory==null){
+ this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+ }
+ this.directory=new File(directory,"amqstore");
+ this.directory.mkdirs();
+ if(this.usageManager!=null){
this.usageManager.addUsageListener(this);
}
-
- if( asyncDataManager == null ) {
- asyncDataManager = createAsyncDataManager();
+ if(asyncDataManager==null){
+ asyncDataManager=createAsyncDataManager();
}
-
- if( referenceStoreAdapter==null ) {
- referenceStoreAdapter = createReferenceStoreAdapter();
+ if(referenceStoreAdapter==null){
+ referenceStoreAdapter=createReferenceStoreAdapter();
}
+ referenceStoreAdapter.setDirectory(new File(directory,"kaha-reference-store"));
+ referenceStoreAdapter.setBrokerName(getBrokerName());
referenceStoreAdapter.setUsageManager(usageManager);
-
- if( taskRunnerFactory==null ) {
- taskRunnerFactory = createTaskRunnerFactory();
+ if(taskRunnerFactory==null){
+ taskRunnerFactory=createTaskRunnerFactory();
}
-
- asyncDataManager.start();
- if( deleteAllMessages ) {
- asyncDataManager.delete();
- try {
- JournalTrace trace = new JournalTrace();
- trace.setMessage("DELETED "+new Date());
- Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
- asyncDataManager.setMark(location, true);
- log.info("Journal deleted: ");
- deleteAllMessages=false;
- } catch (IOException e) {
- throw e;
- } catch (Throwable e) {
- throw IOExceptionSupport.create(e);
- }
-
- referenceStoreAdapter.deleteAllMessages();
+ asyncDataManager.start();
+ if(deleteAllMessages){
+ asyncDataManager.delete();
+ try{
+ JournalTrace trace=new JournalTrace();
+ trace.setMessage("DELETED "+new Date());
+ Location location=asyncDataManager.write(wireFormat.marshal(trace),false);
+ asyncDataManager.setMark(location,true);
+ log.info("Journal deleted: ");
+ deleteAllMessages=false;
+ }catch(IOException e){
+ throw e;
+ }catch(Throwable e){
+ throw IOExceptionSupport.create(e);
+ }
+ referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
-
- Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
- log.info("Active data files: "+files);
-
- checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
- public boolean iterate() {
+ Set<Integer> files=referenceStoreAdapter.getReferenceFileIdsInUse();
+ log.info("Active data files: "+files);
+ checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
+
+ public boolean iterate(){
doCheckpoint();
return false;
}
- }, "ActiveMQ Journal Checkpoint Worker");
-
+ },"ActiveMQ Journal Checkpoint Worker");
createTransactionStore();
recover();
-
// Do a checkpoint periodically.
- periodicCheckpointTask = new Runnable() {
- public void run() {
+ periodicCheckpointTask=new Runnable(){
+
+ public void run(){
checkpoint(false);
- }
- };
- Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
-
- periodicCleanupTask = new Runnable() {
- public void run() {
- cleanup();
- }
- };
- Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
+ }
+ };
+ Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval);
+ periodicCleanupTask=new Runnable(){
+ public void run(){
+ cleanup();
+ }
+ };
+ Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval);
}
-
- public void stop() throws Exception {
-
- if( !started.compareAndSet(true, false) )
+ public void stop() throws Exception{
+ if(!started.compareAndSet(true,false))
return;
-
- this.usageManager.removeUsageListener(this);
+ this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
-
-
- Iterator<AMQMessageStore> iterator = queues.values().iterator();
- while (iterator.hasNext()) {
- AMQMessageStore ms = iterator.next();
+ Iterator<AMQMessageStore> iterator=queues.values().iterator();
+ while(iterator.hasNext()){
+ AMQMessageStore ms=iterator.next();
ms.stop();
}
-
- iterator = topics.values().iterator();
- while (iterator.hasNext()) {
- final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
+ iterator=topics.values().iterator();
+ while(iterator.hasNext()){
+ final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
ms.stop();
}
-
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
- checkpointTask.shutdown();
-
+ checkpointTask.shutdown();
queues.clear();
topics.clear();
-
- IOException firstException = null;
+ IOException firstException=null;
referenceStoreAdapter.stop();
- try {
+ try{
log.debug("Journal close");
asyncDataManager.close();
- } catch (Exception e) {
- firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+ }catch(Exception e){
+ firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
}
-
- if (firstException != null) {
+ if(firstException!=null){
throw firstException;
}
}
-
/**
* When we checkpoint we move all the journalled data to long term storage.
- * @param stopping
+ *
+ * @param stopping
*
* @param b
*/
- public void checkpoint(boolean sync) {
- try {
- if (asyncDataManager == null )
+ public void checkpoint(boolean sync){
+ try{
+ if(asyncDataManager==null)
throw new IllegalStateException("Journal is closed.");
-
- CountDownLatch latch = null;
- synchronized(this) {
- latch = nextCheckpointCountDownLatch;
+ CountDownLatch latch=null;
+ synchronized(this){
+ latch=nextCheckpointCountDownLatch;
}
-
checkpointTask.wakeup();
-
- if (sync) {
+ if(sync){
if(log.isDebugEnabled()){
log.debug("Waitng for checkpoint to complete.");
}
latch.await();
}
- }
- catch (InterruptedException e) {
+ referenceStoreAdapter.checkpoint(sync);
+ }catch(InterruptedException e){
Thread.currentThread().interrupt();
- log.warn("Request to start checkpoint failed: " + e, e);
+ log.warn("Request to start checkpoint failed: "+e,e);
+ }catch(IOException e){
+ log.error("checkpoint failed: "+e,e);
}
}
-
+
/**
* This does the actual checkpoint.
- * @return
+ *
+ * @return
*/
- public boolean doCheckpoint() {
- CountDownLatch latch = null;
- synchronized(this) {
- latch = nextCheckpointCountDownLatch;
- nextCheckpointCountDownLatch = new CountDownLatch(1);
- }
- try {
-
+ public boolean doCheckpoint(){
+ CountDownLatch latch=null;
+ synchronized(this){
+ latch=nextCheckpointCountDownLatch;
+ nextCheckpointCountDownLatch=new CountDownLatch(1);
+ }
+ try{
if(log.isDebugEnabled()){
log.debug("Checkpoint started.");
}
- referenceStoreAdapter.sync();
- Location newMark = null;
-
- Iterator<AMQMessageStore> iterator = queues.values().iterator();
- while (iterator.hasNext()) {
- final AMQMessageStore ms = iterator.next();
- Location mark = (Location) ms.getMark();
- if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
- newMark = mark;
+
+ Location newMark=null;
+ Iterator<AMQMessageStore> iterator=queues.values().iterator();
+ while(iterator.hasNext()){
+ final AMQMessageStore ms=iterator.next();
+ Location mark=(Location)ms.getMark();
+ if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
+ newMark=mark;
}
}
-
- iterator = topics.values().iterator();
- while (iterator.hasNext()) {
- final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
- Location mark = (Location) ms.getMark();
- if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
- newMark = mark;
+ iterator=topics.values().iterator();
+ while(iterator.hasNext()){
+ final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
+ Location mark=(Location)ms.getMark();
+ if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
+ newMark=mark;
}
}
-
- try {
- if (newMark != null) {
+ try{
+ if(newMark!=null){
if(log.isDebugEnabled()){
- log.debug("Marking journal at: " + newMark);
+ log.debug("Marking journal at: "+newMark);
}
- asyncDataManager.setMark(newMark, false);
- writeTraceMessage("CHECKPOINT "+new Date(), true);
+ asyncDataManager.setMark(newMark,false);
+ writeTraceMessage("CHECKPOINT "+new Date(),true);
}
+ }catch(Exception e){
+ log.error("Failed to mark the Journal: "+e,e);
}
- catch (Exception e) {
- log.error("Failed to mark the Journal: " + e, e);
- }
-
if(log.isDebugEnabled()){
log.debug("Checkpoint done.");
}
- }
- catch(IOException e) {
- log.error("Failed to sync reference store",e);
- }
- finally {
+ }finally{
latch.countDown();
}
return true;
@@ -330,197 +295,183 @@
/**
* Cleans up the data files
- * @return
- * @throws IOException
+ *
+ * @return
+ * @throws IOException
*/
- public void cleanup() {
-
- try {
- Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
- asyncDataManager.consolidateDataFilesNotIn(inUse);
- } catch (IOException e) {
- log.error("Could not cleanup data files: "+e, e);
- }
-
+ public void cleanup(){
+ try{
+ Set<Integer> inUse=referenceStoreAdapter.getReferenceFileIdsInUse();
+ asyncDataManager.consolidateDataFilesNotIn(inUse);
+ }catch(IOException e){
+ log.error("Could not cleanup data files: "+e,e);
+ }
}
-
- public Set<ActiveMQDestination> getDestinations() {
- Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+ public Set<ActiveMQDestination> getDestinations(){
+ Set<ActiveMQDestination> destinations=new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
- private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
- if (destination.isQueue()) {
- return createQueueMessageStore((ActiveMQQueue) destination);
- }
- else {
- return createTopicMessageStore((ActiveMQTopic) destination);
+ private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
+ if(destination.isQueue()){
+ return createQueueMessageStore((ActiveMQQueue)destination);
+ }else{
+ return createTopicMessageStore((ActiveMQTopic)destination);
}
}
- public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- AMQMessageStore store = queues.get(destination);
- if (store == null) {
- ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
- store = new AMQMessageStore(this, checkpointStore, destination);
- try {
- store.start();
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
- queues.put(destination, store);
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+ AMQMessageStore store=queues.get(destination);
+ if(store==null){
+ ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination);
+ store=new AMQMessageStore(this,checkpointStore,destination);
+ try{
+ store.start();
+ }catch(Exception e){
+ throw IOExceptionSupport.create(e);
+ }
+ queues.put(destination,store);
}
return store;
}
- public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
- AMQTopicMessageStore store = (AMQTopicMessageStore) topics.get(destinationName);
- if (store == null) {
- TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
- store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
- try {
- store.start();
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
- topics.put(destinationName, store);
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{
+ AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName);
+ if(store==null){
+ TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName);
+ store=new AMQTopicMessageStore(this,checkpointStore,destinationName);
+ try{
+ store.start();
+ }catch(Exception e){
+ throw IOExceptionSupport.create(e);
+ }
+ topics.put(destinationName,store);
}
return store;
}
- public TransactionStore createTransactionStore() throws IOException {
+ public TransactionStore createTransactionStore() throws IOException{
return transactionStore;
}
- public long getLastMessageBrokerSequenceId() throws IOException {
+ public long getLastMessageBrokerSequenceId() throws IOException{
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
- public void beginTransaction(ConnectionContext context) throws IOException {
+ public void beginTransaction(ConnectionContext context) throws IOException{
referenceStoreAdapter.beginTransaction(context);
}
- public void commitTransaction(ConnectionContext context) throws IOException {
+ public void commitTransaction(ConnectionContext context) throws IOException{
referenceStoreAdapter.commitTransaction(context);
}
- public void rollbackTransaction(ConnectionContext context) throws IOException {
+ public void rollbackTransaction(ConnectionContext context) throws IOException{
referenceStoreAdapter.rollbackTransaction(context);
}
-
/**
* @param location
* @return
* @throws IOException
*/
- public DataStructure readCommand(Location location) throws IOException {
- try {
- ByteSequence packet = asyncDataManager.read(location);
- return (DataStructure) wireFormat.unmarshal(packet);
- } catch (IOException e) {
- throw createReadException(location, e);
+ public DataStructure readCommand(Location location) throws IOException{
+ try{
+ ByteSequence packet=asyncDataManager.read(location);
+ return (DataStructure)wireFormat.unmarshal(packet);
+ }catch(IOException e){
+ throw createReadException(location,e);
}
}
/**
- * Move all the messages that were in the journal into long term storage. We
- * just replay and do a checkpoint.
+ * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
- private void recover() throws IllegalStateException, IOException {
-
- Location pos = null;
- int redoCounter = 0;
-
- log.info("Journal Recovery Started from: " + asyncDataManager);
- long start = System.currentTimeMillis();
- ConnectionContext context = new ConnectionContext();
-
+ private void recover() throws IllegalStateException,IOException{
+ Location pos=null;
+ int redoCounter=0;
+ log.info("Journal Recovery Started from: "+asyncDataManager);
+ long start=System.currentTimeMillis();
+ ConnectionContext context=new ConnectionContext();
// While we have records in the journal.
- while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
- ByteSequence data = asyncDataManager.read(pos);
- DataStructure c = (DataStructure) wireFormat.unmarshal(data);
-
- if (c instanceof Message ) {
- Message message = (Message) c;
- AMQMessageStore store = (AMQMessageStore) createMessageStore(message.getDestination());
- if ( message.isInTransaction()) {
- transactionStore.addMessage(store, message, pos);
- }
- else {
- if( store.replayAddMessage(context, message, pos) ) {
- redoCounter++;
+ while((pos=asyncDataManager.getNextLocation(pos))!=null){
+ ByteSequence data=asyncDataManager.read(pos);
+ DataStructure c=(DataStructure)wireFormat.unmarshal(data);
+ if(c instanceof Message){
+ Message message=(Message)c;
+ AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination());
+ if(message.isInTransaction()){
+ transactionStore.addMessage(store,message,pos);
+ }else{
+ if(store.replayAddMessage(context,message,pos)){
+ redoCounter++;
}
}
- } else {
- switch (c.getDataStructureType()) {
- case JournalQueueAck.DATA_STRUCTURE_TYPE:
- {
- JournalQueueAck command = (JournalQueueAck) c;
- AMQMessageStore store = (AMQMessageStore) createMessageStore(command.getDestination());
- if (command.getMessageAck().isInTransaction()) {
- transactionStore.removeMessage(store, command.getMessageAck(), pos);
- }
- else {
- if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
- redoCounter++;
+ }else{
+ switch(c.getDataStructureType()){
+ case JournalQueueAck.DATA_STRUCTURE_TYPE: {
+ JournalQueueAck command=(JournalQueueAck)c;
+ AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination());
+ if(command.getMessageAck().isInTransaction()){
+ transactionStore.removeMessage(store,command.getMessageAck(),pos);
+ }else{
+ if(store.replayRemoveMessage(context,command.getMessageAck())){
+ redoCounter++;
}
}
}
- break;
- case JournalTopicAck.DATA_STRUCTURE_TYPE:
- {
- JournalTopicAck command = (JournalTopicAck) c;
- AMQTopicMessageStore store = (AMQTopicMessageStore) createMessageStore(command.getDestination());
- if (command.getTransactionId() != null) {
- transactionStore.acknowledge(store, command, pos);
- }
- else {
- if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
- redoCounter++;
+ break;
+ case JournalTopicAck.DATA_STRUCTURE_TYPE: {
+ JournalTopicAck command=(JournalTopicAck)c;
+ AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination());
+ if(command.getTransactionId()!=null){
+ transactionStore.acknowledge(store,command,pos);
+ }else{
+ if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
+ .getMessageId())){
+ redoCounter++;
}
}
}
- break;
- case JournalTransaction.DATA_STRUCTURE_TYPE:
- {
- JournalTransaction command = (JournalTransaction) c;
- try {
+ break;
+ case JournalTransaction.DATA_STRUCTURE_TYPE: {
+ JournalTransaction command=(JournalTransaction)c;
+ try{
// Try to replay the packet.
- switch (command.getType()) {
+ switch(command.getType()){
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
- Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
- if (tx == null)
+ Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
+ if(tx==null)
break; // We may be trying to replay a commit that
- // was already committed.
-
+ // was already committed.
// Replay the committed operations.
tx.getOperations();
- for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
- TxOperation op = (TxOperation) iter.next();
- if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
- if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
+ for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
+ TxOperation op=(TxOperation)iter.next();
+ if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
+ if(op.store.replayAddMessage(context,(Message)op.data,op.location))
redoCounter++;
}
- if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
- if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
+ if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
+ if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
redoCounter++;
}
- if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
- JournalTopicAck ack = (JournalTopicAck) op.data;
- if( ((AMQTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
+ if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
+ JournalTopicAck ack=(JournalTopicAck)op.data;
+ if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
+ .getSubscritionName(),ack.getMessageId())){
redoCounter++;
}
}
@@ -531,42 +482,40 @@
transactionStore.replayRollback(command.getTransactionId());
break;
}
- }
- catch (IOException e) {
- log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+ }catch(IOException e){
+ log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
}
}
- break;
+ break;
case JournalTrace.DATA_STRUCTURE_TYPE:
- JournalTrace trace = (JournalTrace) c;
- log.debug("TRACE Entry: " + trace.getMessage());
+ JournalTrace trace=(JournalTrace)c;
+ log.debug("TRACE Entry: "+trace.getMessage());
break;
default:
- log.error("Unknown type of record in transaction log which will be discarded: " + c);
+ log.error("Unknown type of record in transaction log which will be discarded: "+c);
}
}
}
- Location location = writeTraceMessage("RECOVERED "+new Date(), true);
- asyncDataManager.setMark(location, true);
- long end = System.currentTimeMillis();
-
- log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
+ Location location=writeTraceMessage("RECOVERED "+new Date(),true);
+ asyncDataManager.setMark(location,true);
+ long end=System.currentTimeMillis();
+ log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds.");
}
- private IOException createReadException(Location location, Exception e) {
- return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+ private IOException createReadException(Location location,Exception e){
+ return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
}
- protected IOException createWriteException(DataStructure packet, Exception e) {
- return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+ protected IOException createWriteException(DataStructure packet,Exception e){
+ return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
}
- protected IOException createWriteException(String command, Exception e) {
- return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+ protected IOException createWriteException(String command,Exception e){
+ return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
}
- protected IOException createRecoveryFailedException(Exception e) {
- return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+ protected IOException createRecoveryFailedException(Exception e){
+ return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
}
/**
@@ -576,118 +525,119 @@
* @return
* @throws IOException
*/
- public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
- return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite));
+ public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{
+ return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite));
}
- private Location writeTraceMessage(String message, boolean sync) throws IOException {
- JournalTrace trace = new JournalTrace();
+ private Location writeTraceMessage(String message,boolean sync) throws IOException{
+ JournalTrace trace=new JournalTrace();
trace.setMessage(message);
- return writeCommand(trace, sync);
+ return writeCommand(trace,sync);
}
- public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
- newPercentUsage = ((newPercentUsage)/10)*10;
- oldPercentUsage = ((oldPercentUsage)/10)*10;
- if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+ public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+ newPercentUsage=((newPercentUsage)/10)*10;
+ oldPercentUsage=((oldPercentUsage)/10)*10;
+ if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){
checkpoint(false);
}
}
-
- public AMQTransactionStore getTransactionStore() {
+
+ public AMQTransactionStore getTransactionStore(){
return transactionStore;
}
- public void deleteAllMessages() throws IOException {
- deleteAllMessages=true;
+ public void deleteAllMessages() throws IOException{
+ deleteAllMessages=true;
}
-
-
public String toString(){
- return "AMQPersistenceAdapter(" + directory + ")";
+ return "AMQPersistenceAdapter("+directory+")";
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Subclass overridables
- ///////////////////////////////////////////////////////////////////
- protected AsyncDataManager createAsyncDataManager() {
- AsyncDataManager manager = new AsyncDataManager();
- manager.setDirectory(new File(directory, "journal"));
- return manager;
- }
-
- protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
- KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
- return adaptor;
- }
-
- protected TaskRunnerFactory createTaskRunnerFactory() {
- return DefaultThreadPools.getDefaultTaskRunnerFactory();
- }
+ // /////////////////////////////////////////////////////////////////
+ protected AsyncDataManager createAsyncDataManager(){
+ AsyncDataManager manager=new AsyncDataManager();
+ manager.setDirectory(new File(directory,"journal"));
+ return manager;
+ }
+ protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException{
+ KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter();
+ return adaptor;
+ }
+
+ protected TaskRunnerFactory createTaskRunnerFactory(){
+ return DefaultThreadPools.getDefaultTaskRunnerFactory();
+ }
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Property Accessors
- ///////////////////////////////////////////////////////////////////
-
- public AsyncDataManager getAsyncDataManager() {
- return asyncDataManager;
- }
- public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
- this.asyncDataManager = asyncDataManager;
- }
-
- public ReferenceStoreAdapter getReferenceStoreAdapter() {
- return referenceStoreAdapter;
- }
-
-
- public TaskRunnerFactory getTaskRunnerFactory() {
- return taskRunnerFactory;
- }
- public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
- this.taskRunnerFactory = taskRunnerFactory;
- }
+ // /////////////////////////////////////////////////////////////////
+ public AsyncDataManager getAsyncDataManager(){
+ return asyncDataManager;
+ }
+
+ public void setAsyncDataManager(AsyncDataManager asyncDataManager){
+ this.asyncDataManager=asyncDataManager;
+ }
+
+ public ReferenceStoreAdapter getReferenceStoreAdapter(){
+ return referenceStoreAdapter;
+ }
+
+ public TaskRunnerFactory getTaskRunnerFactory(){
+ return taskRunnerFactory;
+ }
+
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
+ this.taskRunnerFactory=taskRunnerFactory;
+ }
/**
* @return Returns the wireFormat.
*/
- public WireFormat getWireFormat() {
+ public WireFormat getWireFormat(){
return wireFormat;
}
- public void setWireFormat(WireFormat wireFormat) {
- this.wireFormat = wireFormat;
- }
- public UsageManager getUsageManager() {
+ public void setWireFormat(WireFormat wireFormat){
+ this.wireFormat=wireFormat;
+ }
+
+ public UsageManager getUsageManager(){
return usageManager;
}
- public void setUsageManager(UsageManager usageManager) {
- this.usageManager = usageManager;
+
+ public void setUsageManager(UsageManager usageManager){
+ this.usageManager=usageManager;
}
- public int getMaxCheckpointMessageAddSize() {
+ public int getMaxCheckpointMessageAddSize(){
return maxCheckpointMessageAddSize;
}
- public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
- this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+
+ public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
+ this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
}
- public int getMaxCheckpointWorkers() {
+ public int getMaxCheckpointWorkers(){
return maxCheckpointWorkers;
}
- public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
- this.maxCheckpointWorkers = maxCheckpointWorkers;
+
+ public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
+ this.maxCheckpointWorkers=maxCheckpointWorkers;
}
- public File getDirectory() {
- return directory;
- }
+ public File getDirectory(){
+ return directory;
+ }
- public void setDirectory(File directory) {
- this.directory = directory;
- }
+ public void setDirectory(File directory){
+ this.directory=directory;
+ }
public boolean isSyncOnWrite(){
return this.syncOnWrite;
@@ -696,7 +646,4 @@
public void setSyncOnWrite(boolean syncOnWrite){
this.syncOnWrite=syncOnWrite;
}
-
-
-
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=auto&rev=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Tue Mar 6 02:25:48 2007
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * An implementation of {@link PersistenceAdapterFactory}
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.17 $
+ */
+public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{
+
+ private TaskRunnerFactory taskRunnerFactory;
+ private File dataDirectory;
+ private int journalThreadPriority = Thread.MAX_PRIORITY;
+ private String brokerName="localhost";
+
+ /**
+ * @return a AMQPersistenceAdapter
+ * @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter()
+ */
+ public PersistenceAdapter createPersistenceAdapter(){
+ AMQPersistenceAdapter result = new AMQPersistenceAdapter();
+ result.setDirectory(getDataDirectory());
+ result.setTaskRunnerFactory(getTaskRunnerFactory());
+ result.setBrokerName(getBrokerName());
+ return result;
+ }
+
+ /**
+ * @return the dataDirectory
+ */
+ public File getDataDirectory(){
+ if(this.dataDirectory==null){
+ this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+ }
+ return this.dataDirectory;
+ }
+
+ /**
+ * @param dataDirectory the dataDirectory to set
+ */
+ public void setDataDirectory(File dataDirectory){
+ this.dataDirectory=dataDirectory;
+ }
+
+ /**
+ * @return the taskRunnerFactory
+ */
+ public TaskRunnerFactory getTaskRunnerFactory(){
+ if( taskRunnerFactory == null ) {
+ taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000);
+ }
+ return taskRunnerFactory;
+ }
+
+ /**
+ * @param taskRunnerFactory the taskRunnerFactory to set
+ */
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
+ this.taskRunnerFactory=taskRunnerFactory;
+ }
+
+
+ /**
+ * @return the journalThreadPriority
+ */
+ public int getJournalThreadPriority(){
+ return this.journalThreadPriority;
+ }
+
+
+ /**
+ * @param journalThreadPriority the journalThreadPriority to set
+ */
+ public void setJournalThreadPriority(int journalThreadPriority){
+ this.journalThreadPriority=journalThreadPriority;
+ }
+
+
+ /**
+ * @return the brokerName
+ */
+ public String getBrokerName(){
+ return this.brokerName;
+ }
+
+
+ /**
+ * @param brokerName the brokerName to set
+ */
+ public void setBrokerName(String brokerName){
+ this.brokerName=brokerName;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -43,6 +43,7 @@
import javax.sql.DataSource;
+import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
@@ -483,7 +484,16 @@
return new DefaultDatabaseLocker(getDataSource(), getStatements());
}
+ public void setBrokerName(String brokerName){
+ }
+
public String toString(){
return "JDBCPersistenceAdaptor("+super.toString()+")";
+ }
+
+ public void setDirectory(File dir){
+ }
+
+ public void checkpoint(boolean sync) throws IOException{
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq.store.journal;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -314,6 +315,10 @@
log.warn("Request to start checkpoint failed: " + e, e);
}
}
+
+ public void checkpoint(boolean sync) {
+ checkpoint(sync,sync);
+ }
/**
* This does the actual checkpoint.
@@ -666,8 +671,15 @@
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
+ public void setBrokerName(String brokerName){
+ longTermPersistence.setBrokerName(brokerName);
+ }
+
public String toString(){
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
+ }
+
+ public void setDirectory(File dir){
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=auto&rev=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Tue Mar 6 02:25:48 2007
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.journal;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.journal.active.JournalLockedException;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.Statements;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Factory class that can create PersistenceAdapter objects.
+ *
+ * @version $Revision: 1.4 $
+ */
+public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
+
+ private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
+
+ private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
+
+ private int journalLogFileSize = 1024*1024*20;
+ private int journalLogFiles = 2;
+ private TaskRunnerFactory taskRunnerFactory;
+ private Journal journal;
+ private boolean useJournal=true;
+ private boolean useQuickJournal=false;
+ private File journalArchiveDirectory;
+ private boolean failIfJournalIsLocked=false;
+ private int journalThreadPriority = Thread.MAX_PRIORITY;
+ private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+
+ public PersistenceAdapter createPersistenceAdapter() throws IOException {
+ jdbcPersistenceAdapter.setDataSource(getDataSource());
+
+ if( !useJournal ) {
+ return jdbcPersistenceAdapter;
+ }
+ return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
+
+ }
+
+ public int getJournalLogFiles() {
+ return journalLogFiles;
+ }
+
+ /**
+ * Sets the number of journal log files to use
+ */
+ public void setJournalLogFiles(int journalLogFiles) {
+ this.journalLogFiles = journalLogFiles;
+ }
+
+ public int getJournalLogFileSize() {
+ return journalLogFileSize;
+ }
+
+ /**
+ * Sets the size of the journal log files
+ *
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+ */
+ public void setJournalLogFileSize(int journalLogFileSize) {
+ this.journalLogFileSize = journalLogFileSize;
+ }
+
+ public JDBCPersistenceAdapter getJdbcAdapter() {
+ return jdbcPersistenceAdapter;
+ }
+
+ public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
+ this.jdbcPersistenceAdapter = jdbcAdapter;
+ }
+
+ public boolean isUseJournal() {
+ return useJournal;
+ }
+
+ /**
+ * Enables or disables the use of the journal. The default is to use the journal
+ *
+ * @param useJournal
+ */
+ public void setUseJournal(boolean useJournal) {
+ this.useJournal = useJournal;
+ }
+
+ public TaskRunnerFactory getTaskRunnerFactory() {
+ if( taskRunnerFactory == null ) {
+ taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000);
+ }
+ return taskRunnerFactory;
+ }
+
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+ this.taskRunnerFactory = taskRunnerFactory;
+ }
+
+ public Journal getJournal() throws IOException {
+ if( journal == null ) {
+ createJournal();
+ }
+ return journal;
+ }
+
+ public void setJournal(Journal journal) {
+ this.journal = journal;
+ }
+
+ public File getJournalArchiveDirectory() {
+ if( journalArchiveDirectory == null && useQuickJournal ) {
+ journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
+ }
+ return journalArchiveDirectory;
+ }
+
+ public void setJournalArchiveDirectory(File journalArchiveDirectory) {
+ this.journalArchiveDirectory = journalArchiveDirectory;
+ }
+
+
+ public boolean isUseQuickJournal() {
+ return useQuickJournal;
+ }
+
+ /**
+ * Enables or disables the use of quick journal, which keeps messages in the journal and just
+ * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside
+ * long term in the JDBC database.
+ */
+ public void setUseQuickJournal(boolean useQuickJournal) {
+ this.useQuickJournal = useQuickJournal;
+ }
+
+ public JDBCAdapter getAdapter() throws IOException {
+ return jdbcPersistenceAdapter.getAdapter();
+ }
+
+ public void setAdapter(JDBCAdapter adapter) {
+ jdbcPersistenceAdapter.setAdapter(adapter);
+ }
+
+ public Statements getStatements() {
+ return jdbcPersistenceAdapter.getStatements();
+ }
+ public void setStatements(Statements statements) {
+ jdbcPersistenceAdapter.setStatements(statements);
+ }
+
+ public boolean isUseDatabaseLock() {
+ return jdbcPersistenceAdapter.isUseDatabaseLock();
+ }
+
+ /**
+ * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+ */
+ public void setUseDatabaseLock(boolean useDatabaseLock) {
+ jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
+ }
+
+ public boolean isCreateTablesOnStartup() {
+ return jdbcPersistenceAdapter.isCreateTablesOnStartup();
+ }
+
+ /**
+ * Sets whether or not tables are created on startup
+ */
+ public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
+ jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
+ }
+
+ public int getJournalThreadPriority(){
+ return journalThreadPriority;
+ }
+
+ /**
+ * Sets the thread priority of the journal thread
+ */
+ public void setJournalThreadPriority(int journalThreadPriority){
+ this.journalThreadPriority=journalThreadPriority;
+ }
+
+ /**
+ * @throws IOException
+ */
+ protected void createJournal() throws IOException {
+ File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
+ if( failIfJournalIsLocked ) {
+ journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+ } else {
+ while( true ) {
+ try {
+ journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+ break;
+ } catch (JournalLockedException e) {
+ log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked.");
+ try {
+ Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ }
+
+
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Tue Mar 6 02:25:48 2007
@@ -74,10 +74,15 @@
removeMessage(ack.getLastMessageId());
}
+
+
public synchronized void removeMessage(MessageId msgId) throws IOException{
- messageContainer.remove(msgId);
- if(messageContainer.isEmpty()){
- resetBatching();
+ StoreEntry entry=messageContainer.getEntry(msgId);
+ if(entry!=null){
+ messageContainer.remove(entry);
+ if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+ resetBatching();
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -41,38 +41,30 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class KahaPersistenceAdapter implements PersistenceAdapter{
- private static final int STORE_LOCKED_WAIT_DELAY = 10*1000;
+
+ private static final int STORE_LOCKED_WAIT_DELAY=10*1000;
private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore;
- ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
- ConcurrentHashMap<ActiveMQQueue, MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
- ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
+ ConcurrentHashMap<ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,TopicMessageStore>();
+ ConcurrentHashMap<ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,MessageStore>();
+ ConcurrentHashMap<ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination,MessageStore>();
protected OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength=32*1024*1024;
-
-
- private File dir;
+ private File directory;
+ private String brokerName;
private Store theStore;
-
- public KahaPersistenceAdapter(File dir) throws IOException{
- if(!dir.exists()){
- dir.mkdirs();
- }
- this.dir=dir;
- wireFormat.setCacheEnabled(false);
- wireFormat.setTightEncodingEnabled(true);
- }
+ private boolean initialized;
public Set<ActiveMQDestination> getDestinations(){
Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
@@ -81,7 +73,7 @@
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
- rc.add((ActiveMQDestination) obj);
+ rc.add((ActiveMQDestination)obj);
}
}
}catch(IOException e){
@@ -127,25 +119,25 @@
}
public TransactionStore createTransactionStore() throws IOException{
-
if(transactionStore==null){
- while (true) {
- try {
- Store store=getStore();
- MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
- container.setKeyMarshaller(new CommandMarshaller(wireFormat));
- container.setValueMarshaller(new TransactionMarshaller(wireFormat));
- container.load();
- transactionStore=new KahaTransactionStore(this,container);
- break;
- }catch(StoreLockedExcpetion e) {
- log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)+" seconds for the Store to be unlocked.");
+ while(true){
+ try{
+ Store store=getStore();
+ MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
+ container.setKeyMarshaller(new CommandMarshaller(wireFormat));
+ container.setValueMarshaller(new TransactionMarshaller(wireFormat));
+ container.load();
+ transactionStore=new KahaTransactionStore(this,container);
+ break;
+ }catch(StoreLockedExcpetion e){
+ log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)
+ +" seconds for the Store to be unlocked.");
try{
Thread.sleep(STORE_LOCKED_WAIT_DELAY);
}catch(InterruptedException e1){
}
}
- }
+ }
}
return transactionStore;
}
@@ -163,6 +155,7 @@
}
public void start() throws Exception{
+ initialize();
}
public void stop() throws Exception{
@@ -182,37 +175,37 @@
}else{
theStore.delete();
}
- }else {
+ }else{
StoreFactory.delete(getStoreName());
}
}
protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
- MapContainer<MessageId, Message> container=store.getMapContainer(id,containerName);
+ MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
- container.setValueMarshaller(new MessageMarshaller(wireFormat));
+ container.setValueMarshaller(new MessageMarshaller(wireFormat));
container.load();
return container;
}
-
+
protected MapContainer<String,Object> getSubsMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
- MapContainer<String, Object> container=store.getMapContainer(id,containerName);
+ MapContainer<String,Object> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(Store.StringMarshaller);
- container.setValueMarshaller(createMessageMarshaller());
+ container.setValueMarshaller(createMessageMarshaller());
container.load();
return container;
}
- protected Marshaller<Object> createMessageMarshaller() {
- return new CommandMarshaller(wireFormat);
- }
+ protected Marshaller<Object> createMessageMarshaller(){
+ return new CommandMarshaller(wireFormat);
+ }
- protected ListContainer getListContainer(Object id,String containerName) throws IOException{
+ protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
- container.setMarshaller(createMessageMarshaller());
+ container.setMarshaller(createMessageMarshaller());
container.load();
return container;
}
@@ -239,8 +232,6 @@
this.maxDataFileLength=maxDataFileLength;
}
-
-
protected synchronized Store getStore() throws IOException{
if(theStore==null){
theStore=StoreFactory.open(getStoreName(),"rw");
@@ -248,13 +239,50 @@
}
return theStore;
}
-
+
private String getStoreName(){
- String name=dir.getAbsolutePath()+File.separator+"kaha.db";
- return name;
+ initialize();
+ return directory.getAbsolutePath();
}
-
+
public String toString(){
- return "KahaPersistenceAdapter(" + getStoreName() +")";
+ return "KahaPersistenceAdapter("+getStoreName()+")";
+ }
+
+ public void setBrokerName(String brokerName){
+ this.brokerName=brokerName;
+ }
+
+ public String getBrokerName(){
+ return brokerName;
+ }
+
+ public File getDirectory(){
+ return this.directory;
+ }
+
+ public void setDirectory(File directory){
+ this.directory=directory;
+ }
+
+ public void checkpoint(boolean sync) throws IOException{
+ if(sync){
+ getStore().force();
+ }
+ }
+
+ private void initialize(){
+ if(!initialized){
+ initialized=true;
+ if(this.directory==null){
+ this.directory=new File(IOHelper.getDefaultDataDirectory());
+ this.directory=new File(this.directory,brokerName+"-kahastore");
+ }
+ this.directory.mkdirs();
+ wireFormat.setCacheEnabled(false);
+ wireFormat.setTightEncodingEnabled(true);
+ }
}
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Tue Mar 6 02:25:48 2007
@@ -77,7 +77,9 @@
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
+ if (entry != null) {
entry=messageContainer.getNext(entry);
+ }
}
if(entry!=null){
int count=0;
@@ -120,11 +122,14 @@
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
- ReferenceRecord rr=messageContainer.remove(msgId);
- if(rr!=null){
- removeInterest(rr);
- if(messageContainer.isEmpty()){
- resetBatching();
+ StoreEntry entry=messageContainer.getEntry(msgId);
+ if(entry!=null){
+ ReferenceRecord rr=messageContainer.remove(msgId);
+ if(rr!=null){
+ removeInterest(rr);
+ if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+ resetBatching();
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Tue Mar 6 02:25:48 2007
@@ -17,10 +17,8 @@
*/
package org.apache.activemq.store.kahadaptor;
-import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,7 +27,6 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageIdMarshaller;
@@ -39,7 +36,6 @@
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,10 +47,7 @@
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
private boolean storeValid;
- public KahaReferenceStoreAdapter(File dir) throws IOException {
- super(dir);
- }
-
+
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
throw new RuntimeException("Use createQueueReferenceStore instead");
}
@@ -164,10 +157,7 @@
}
}
- public void sync() throws IOException {
- getStore().force();
- }
-
+
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Tue Mar 6 02:25:48 2007
@@ -119,8 +119,9 @@
}
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
+ /*
if(retroactive){
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
@@ -128,6 +129,7 @@
container.add(ref);
}
}
+ */
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Tue Mar 6 02:25:48 2007
@@ -17,7 +17,6 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -85,7 +84,8 @@
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
- container.add(ref);
+ StoreEntry listEntry = container.add(ref);
+
}
}
}
@@ -118,8 +118,8 @@
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
- String subcriberId=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
ConsumerMessageRef ref=container.remove();
if(container.isEmpty()){
@@ -140,6 +140,7 @@
removeInterest(rr);
}
}else{
+
ackContainer.update(ref.getAckEntry(),tsa);
}
}
@@ -163,13 +164,15 @@
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+ /*
+ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
ref.setMessageEntry(tsa.getMessageEntry());
container.add(ref);
}
+ */
}
}
@@ -186,7 +189,7 @@
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- return container.size();
+ return container != null ? container.size() : 0;
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
@@ -226,6 +229,7 @@
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
+
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Tue Mar 6 02:25:48 2007
@@ -54,14 +54,20 @@
return listContainer.isEmpty();
}
- public void add(ConsumerMessageRef ref) {
- listContainer.add(ref);
+ public StoreEntry add(ConsumerMessageRef ref) {
+ return listContainer.placeLast(ref);
}
- public ConsumerMessageRef remove() {
- ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst();
- if (listContainer.isEmpty()) {
- reset();
+ public ConsumerMessageRef remove(){
+ ConsumerMessageRef result=null;
+ if(!listContainer.isEmpty()){
+ StoreEntry entry=listContainer.getFirst();
+ if(entry!=null){
+ result=(ConsumerMessageRef)listContainer.removeFirst();
+ if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+ reset();
+ }
+ }
}
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Tue Mar 6 02:25:48 2007
@@ -158,4 +158,13 @@
public String toString(){
return "MemoryPersistenceAdapter";
}
+
+ public void setBrokerName(String brokerName){
+ }
+
+ public void setDirectory(File dir){
+ }
+
+ public void checkpoint(boolean sync) throws IOException{
+ }
}