You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/05 11:06:43 UTC
svn commit: r741061 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
main/java/org/apache/activemq/store/journal/ ma...
Author: gtully
Date: Thu Feb 5 10:06:39 2009
New Revision: 741061
URL: http://svn.apache.org/viewvc?rev=741061&view=rev
Log:
store specific tests for setBatch http://issues.apache.org/activemq/browse/AMQ-2020
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Feb 5 10:06:39 2009
@@ -158,10 +158,10 @@
lastCachedId = node.getMessageId();
} else {
if (cacheEnabled) {
+ cacheEnabled=false;
// sync with store on disabling the cache
setBatch(lastCachedId);
}
- cacheEnabled=false;
}
size++;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Feb 5 10:06:39 2009
@@ -249,4 +249,9 @@
}
+ @Override
+ public void setBatch(MessageId messageId) {
+ lastMessageId.set(messageId.getBrokerSequenceId());
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Feb 5 10:06:39 2009
@@ -720,7 +720,7 @@
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
- s.setLong(2, nextSeq - maxReturned);
+ s.setLong(2, nextSeq);
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Thu Feb 5 10:06:39 2009
@@ -411,4 +411,10 @@
}
+ @Override
+ public void setBatch(MessageId messageId) {
+ peristenceAdapter.checkpoint(true, true);
+ longTermStore.setBatch(messageId);
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Thu Feb 5 10:06:39 2009
@@ -175,4 +175,10 @@
public boolean isSupportForCursors() {
return true;
}
+
+ @Override
+ public void setBatch(MessageId messageId) {
+ batchEntry = messageContainer.getEntry(messageId);
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Feb 5 10:06:39 2009
@@ -243,6 +243,11 @@
cursorPos=0;
}
+
+ @Override
+ public void setBatch(MessageId messageId) {
+ }
+
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
public void start() throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Feb 5 10:06:39 2009
@@ -150,4 +150,10 @@
public void resetBatching() {
lastBatchId = null;
}
+
+ @Override
+ public void setBatch(MessageId messageId) {
+ lastBatchId = messageId;
+ }
+
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java Thu Feb 5 10:06:39 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorJDBCNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+ broker.setPersistenceAdapter(persistenceAdapter);
+ return broker;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java Thu Feb 5 10:06:39 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorJournalNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+
+ File dataFileDir = new File("target/activemq-data/StoreQueueCursorJournalNoDuplicateTest");
+ File journalDir = new File(dataFileDir, "journal").getCanonicalFile();
+ JournalImpl journal = new JournalImpl(journalDir, 3, 1024 * 1024 * 20);
+
+ KahaPersistenceAdapter kahaAdaptor = new KahaPersistenceAdapter();
+ kahaAdaptor.setDirectory(dataFileDir);
+ JournalPersistenceAdapter journalAdaptor = new JournalPersistenceAdapter(journal, kahaAdaptor, broker.getTaskRunnerFactory());
+ journalAdaptor.setMaxCheckpointWorkers(1);
+
+ broker.setPersistenceAdapter(journalAdaptor);
+ return broker;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java Thu Feb 5 10:06:39 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorMemoryNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ broker.setPersistent(false);
+ return broker;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java Thu Feb 5 10:06:39 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorNoDuplicateTest extends TestCase {
+ ActiveMQQueue destination = new ActiveMQQueue("queue-"
+ + StoreQueueCursorNoDuplicateTest.class.getSimpleName());
+ BrokerService brokerService;
+
+ final static String mesageIdRoot = "11111:22222:";
+ final int messageBytesSize = 1024;
+ final String text = new String(new byte[messageBytesSize]);
+
+ protected int count = 6;
+
+ public void setUp() throws Exception {
+ brokerService = createBroker();
+ brokerService.setUseJmx(false);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return new BrokerService();
+ }
+
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ }
+
+ public void testNoDuplicateAfterCacheFullAndReadPast() throws Exception {
+ final PersistenceAdapter persistenceAdapter = brokerService
+ .getPersistenceAdapter();
+ final MessageStore queueMessageStore = persistenceAdapter
+ .createQueueMessageStore(destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+
+ final ConnectionContext contextNotInTx = new ConnectionContext();
+ for (int i = 0; i < count; i++) {
+ ActiveMQTextMessage msg = getMessage(i);
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+
+ queueMessageStore.addMessage(contextNotInTx, msg);
+ underTest.addMessageLast(msg);
+ }
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(2);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ underTest.remove();
+ assertEquals(dequeueCount++, ref.getMessageId()
+ .getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ private ActiveMQTextMessage getMessage(int i) throws Exception {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ MessageId id = new MessageId(mesageIdRoot + i);
+ id.setBrokerSequenceId(i);
+ id.setProducerSequenceId(i);
+ message.setMessageId(id);
+ message.setDestination(destination);
+ message.setPersistent(true);
+ message.setResponseRequired(true);
+ message.setText("Msg:" + i + " " + text);
+ assertEquals(message.getMessageId().getProducerSequenceId(), i);
+ return message;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date