You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 11:39:05 UTC
svn commit: r492511 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store:
./ kahadaptor/ quick/
Author: chirino
Date: Thu Jan 4 02:39:04 2007
New Revision: 492511
URL: http://svn.apache.org/viewvc?view=rev&rev=492511
Log:
Quick store bug fixes.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Jan 4 02:39:04 2007
@@ -54,6 +54,11 @@
public void setOffset(int offset) {
this.offset = offset;
}
+
+ @Override
+ public String toString() {
+ return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration;
+ }
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Jan 4 02:39:04 2007
@@ -95,7 +95,7 @@
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
- if(msg.messageId.equals(identity)){
+ if(msg.messageId.equals(identity.toString())){
result=msg;
cache.put(identity,entry);
break;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Thu Jan 4 02:39:04 2007
@@ -30,6 +30,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -167,6 +168,8 @@
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
+ System.out.println("referenceStore.put "+id+"-->"+data);
+
}
}
catch (Throwable e) {
@@ -332,6 +335,7 @@
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
+ System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@@ -394,21 +398,23 @@
if( data==null ) {
data = referenceStore.getMessageReference(identity);
+ System.out.println("referenceStore.get "+identity+"-->"+data);
+ if( data==null ) {
+ return null;
+ }
}
-
- if( data==null ) {
- return null;
- }
-
- Message answer = null;
- if (answer != null ) {
- return answer;
- }
-
+
Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
- return (Message) peristenceAdapter.readCommand(location);
+
+ DataStructure rc = peristenceAdapter.readCommand(location);
+
+ try {
+ return (Message) rc;
+ } catch (ClassCastException e) {
+ throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc);
+ }
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Thu Jan 4 02:39:04 2007
@@ -88,7 +88,7 @@
private UsageManager usageManager;
- private long cleanupInterval = 1000 * 10;
+ private long cleanupInterval = 1000 * 1/10;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
@@ -147,15 +147,7 @@
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
- for (Integer fileId : files) {
- try {
- asyncDataManager.addInterestInFile(fileId);
- } catch (IOException e) {
- // We can expect these since referenceStoreAdapter is a litle behind in updates
- // and it might think it has references to data files that have allready come and gone..
- // This should get resolved once recovery kicks in.
- }
- }
+ log.info("Active data files: "+files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() {
@@ -172,8 +164,7 @@
public void run() {
checkpoint(false);
}
- };
-
+ };
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
periodicCleanupTask = new Runnable() {
@@ -193,6 +184,7 @@
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
+ Scheduler.cancel(periodicCleanupTask);
Iterator<QuickMessageStore> iterator = queues.values().iterator();
@@ -499,7 +491,7 @@
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
- op.store.replayAddMessage(context, (Message)op.data, pos);
+ op.store.replayAddMessage(context, (Message)op.data, op.location);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck) op.data);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java Thu Jan 4 02:39:04 2007
@@ -56,11 +56,13 @@
public byte operationType;
public QuickMessageStore store;
public Object data;
+ public Location location;
- public TxOperation(byte operationType, QuickMessageStore store, Object data) {
+ public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
+ this.location=location;
}
}
@@ -77,16 +79,16 @@
this.location=location;
}
- public void add(QuickMessageStore store, Message msg) {
- operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
+ public void add(QuickMessageStore store, Message msg, Location location) {
+ operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
}
public void add(QuickMessageStore store, MessageAck ack) {
- operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
+ operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
}
public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
- operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
+ operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
}
public Message[] getMessages() {
@@ -283,7 +285,7 @@
*/
void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
- tx.add(store, message);
+ tx.add(store, message, location);
}
/**