You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/05 11:06:43 UTC

svn commit: r741061 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/java/org/apache/activemq/store/journal/ ma...

Author: gtully
Date: Thu Feb  5 10:06:39 2009
New Revision: 741061

URL: http://svn.apache.org/viewvc?rev=741061&view=rev
Log:
store specific tests for setBatch http://issues.apache.org/activemq/browse/AMQ-2020

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Feb  5 10:06:39 2009
@@ -158,10 +158,10 @@
             lastCachedId = node.getMessageId();
         } else {
             if (cacheEnabled) {
+                cacheEnabled=false;
                 // sync with store on disabling the cache
                 setBatch(lastCachedId);
             }
-            cacheEnabled=false;
         }
         size++;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Feb  5 10:06:39 2009
@@ -249,4 +249,9 @@
 
     }
 
+    @Override
+    public void setBatch(MessageId messageId) {
+        lastMessageId.set(messageId.getBrokerSequenceId());
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Feb  5 10:06:39 2009
@@ -720,7 +720,7 @@
             s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
             s.setMaxRows(maxReturned * 2);
             s.setString(1, destination.getQualifiedName());
-            s.setLong(2, nextSeq - maxReturned);
+            s.setLong(2, nextSeq);
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Thu Feb  5 10:06:39 2009
@@ -411,4 +411,10 @@
 
     }
 
+    @Override
+    public void setBatch(MessageId messageId) {
+        peristenceAdapter.checkpoint(true, true);
+        longTermStore.setBatch(messageId);
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Thu Feb  5 10:06:39 2009
@@ -175,4 +175,10 @@
     public boolean isSupportForCursors() {
         return true;
     }
+
+    @Override
+    public void setBatch(MessageId messageId) {
+        batchEntry = messageContainer.getEntry(messageId);
+    }
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Feb  5 10:06:39 2009
@@ -243,6 +243,11 @@
             cursorPos=0;
         }
 
+        
+        @Override
+        public void setBatch(MessageId messageId) {
+        }
+
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
         }
         public void start() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Feb  5 10:06:39 2009
@@ -150,4 +150,10 @@
     public void resetBatching() {
         lastBatchId = null;
     }
+
+    @Override
+    public void setBatch(MessageId messageId) {
+        lastBatchId = messageId;
+    }
+    
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java Thu Feb  5 10:06:39 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorJDBCNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        broker.setPersistenceAdapter(persistenceAdapter);
+        return broker;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java Thu Feb  5 10:06:39 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorJournalNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        
+        File dataFileDir = new File("target/activemq-data/StoreQueueCursorJournalNoDuplicateTest");
+        File journalDir = new File(dataFileDir, "journal").getCanonicalFile();
+        JournalImpl journal = new JournalImpl(journalDir, 3, 1024 * 1024 * 20);
+
+        KahaPersistenceAdapter kahaAdaptor = new KahaPersistenceAdapter();
+        kahaAdaptor.setDirectory(dataFileDir);
+        JournalPersistenceAdapter journalAdaptor = new JournalPersistenceAdapter(journal, kahaAdaptor, broker.getTaskRunnerFactory());
+        journalAdaptor.setMaxCheckpointWorkers(1);
+
+        broker.setPersistenceAdapter(journalAdaptor);
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java Thu Feb  5 10:06:39 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorMemoryNoDuplicateTest extends StoreQueueCursorNoDuplicateTest {
+  
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setPersistent(false);
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=741061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java Thu Feb  5 10:06:39 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorNoDuplicateTest extends TestCase {
+    ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + StoreQueueCursorNoDuplicateTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:";
+    final int messageBytesSize = 1024;
+    final String text = new String(new byte[messageBytesSize]);
+
+    protected int count = 6;
+
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    public void testNoDuplicateAfterCacheFullAndReadPast() throws Exception {
+        final PersistenceAdapter persistenceAdapter = brokerService
+                .getPersistenceAdapter();
+        final MessageStore queueMessageStore = persistenceAdapter
+                .createQueueMessageStore(destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+
+        final ConnectionContext contextNotInTx = new ConnectionContext();
+        for (int i = 0; i < count; i++) {
+            ActiveMQTextMessage msg = getMessage(i);
+            msg.setMemoryUsage(systemUsage.getMemoryUsage());
+
+            queueMessageStore.addMessage(contextNotInTx, msg);
+            underTest.addMessageLast(msg);
+        }
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            underTest.remove();
+            assertEquals(dequeueCount++, ref.getMessageId()
+                    .getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    private ActiveMQTextMessage getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        MessageId id = new MessageId(mesageIdRoot + i);
+        id.setBrokerSequenceId(i);
+        id.setProducerSequenceId(i);
+        message.setMessageId(id);
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date