You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/21 12:37:23 UTC
svn commit: r1436291 - in /activemq/trunk:
activemq-core/src/test/java/org/apache/activemq/bugs/
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/
Author: gtully
Date: Mon Jan 21 11:37:23 2013
New Revision: 1436291
URL: http://svn.apache.org/viewvc?rev=1436291&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4172 - resolve with test. inflight transactions need to just protect a data file range rather than all subsequent data files. so gc can reclaim what is valid
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1436291&r1=1436290&r2=1436291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Mon Jan 21 11:37:23 2013
@@ -54,7 +54,7 @@ public class AMQ2736Test {
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
KahaDBStore store = pa.getStore();
- assertNotNull("last tx location is present " + store.getFirstInProgressTxLocation());
+ assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]);
// test hack, close the journal to ensure no further journal updates when broker stops
// mimic kill -9 in terms of no normal shutdown sequence
@@ -74,7 +74,7 @@ public class AMQ2736Test {
store = pa.getStore();
// inflight non xa tx should be rolledback on recovery
- assertNull("in progress tx location is present ", store.getFirstInProgressTxLocation());
+ assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]);
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java?rev=1436291&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java Mon Jan 21 11:37:23 2013
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+
+public class TransactedStoreUsageSuspendResumeTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);
+
+ private static final int MAX_MESSAGES = 10000;
+
+ private static final String QUEUE_NAME = "test.queue";
+
+ private BrokerService broker;
+
+ private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES);
+ private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES);
+ private final CountDownLatch consumerStartLatch = new CountDownLatch(1);
+
+ private class ConsumerThread extends Thread {
+
+ @Override
+ public void run() {
+ try {
+
+ consumerStartLatch.await(30, TimeUnit.SECONDS);
+
+ ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ // wait for producer to stop
+ long currentSendCount;
+ do {
+ currentSendCount = messagesSentCountDown.getCount();
+ TimeUnit.SECONDS.sleep(5);
+ } while (currentSendCount != messagesSentCountDown.getCount());
+
+ LOG.info("Starting consumer at: " + currentSendCount);
+
+ MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+
+ do {
+ Message message = consumer.receive(1000);
+ if (message != null) {
+ session.commit();
+ messagesReceivedCountDown.countDown();
+ }
+ if (messagesReceivedCountDown.getCount() % 500 == 0) {
+ LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount());
+ }
+ } while (messagesReceivedCountDown.getCount() != 0);
+ consumer.close();
+ session.close();
+ connection.close();
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(true);
+
+ KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
+ kahaDB.setJournalMaxFileLength(500 * 1024);
+ kahaDB.setCleanupInterval(10*1000);
+ broker.setPersistenceAdapter(kahaDB);
+
+ broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024);
+
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ }
+
+ @Test
+ public void testTransactedStoreUsageSuspendResume() throws Exception {
+
+ ConsumerThread thread = new ConsumerThread();
+ thread.start();
+ ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
+ sendExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sendMessages();
+ } catch (Exception ignored) {
+ }
+ }
+ });
+ sendExecutor.shutdown();
+ sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
+
+ boolean allMessagesReceived = messagesReceivedCountDown.await(120, TimeUnit.SECONDS);
+ assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
+ }
+
+ private void sendMessages() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.setAlwaysSyncSend(true);
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination queue = session.createQueue(QUEUE_NAME);
+ Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain");
+ MessageProducer producer = session.createProducer(null);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[10]);
+
+ for (int i=0; i<4240; i++) {
+ // mostly fill the store with retained messages
+ // so consumer only has a small bit of store usage to work with
+ producer.send(retainQueue, message);
+ session.commit();
+ }
+
+ consumerStartLatch.countDown();
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ producer.send(queue, message);
+ if (i>0 && i%20 == 0) {
+ session.commit();
+ }
+ messagesSentCountDown.countDown();
+ if (i>0 && i%500 == 0) {
+ LOG.info("Sent : " + i);
+ }
+
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ connection.close();
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1436291&r1=1436290&r2=1436291&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Jan 21 11:37:23 2013
@@ -29,6 +29,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -421,7 +422,7 @@ public abstract class MessageDatabase ex
try {
if( pageFile != null && pageFile.isLoaded() ) {
metadata.state = CLOSED_STATE;
- metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+ metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
if (metadata.page != null) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -440,30 +441,36 @@ public abstract class MessageDatabase ex
// public for testing
@SuppressWarnings("rawtypes")
- public Location getFirstInProgressTxLocation() {
- Location l = null;
+ public Location[] getInProgressTxLocationRange() {
+ Location[] range = new Location[]{null, null};
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
for (List<Operation> ops : inflightTransactions.values()) {
if (!ops.isEmpty()) {
- l = ops.get(0).getLocation();
- break;
+ trackMaxAndMin(range, ops);
}
}
}
if (!preparedTransactions.isEmpty()) {
for (List<Operation> ops : preparedTransactions.values()) {
if (!ops.isEmpty()) {
- Location t = ops.get(0).getLocation();
- if (l==null || t.compareTo(l) <= 0) {
- l = t;
- }
- break;
+ trackMaxAndMin(range, ops);
}
}
}
}
- return l;
+ return range;
+ }
+
+ private void trackMaxAndMin(Location[] range, List<Operation> ops) {
+ Location t = ops.get(0).getLocation();
+ if (range[0]==null || t.compareTo(range[0]) <= 0) {
+ range[0] = t;
+ }
+ t = ops.get(ops.size() -1).getLocation();
+ if (range[1]==null || t.compareTo(range[1]) >= 0) {
+ range[1] = t;
+ }
}
class TranInfo {
@@ -1385,11 +1392,12 @@ public abstract class MessageDatabase ex
LOG.debug("Checkpoint started.");
// reflect last update exclusive of current checkpoint
- Location firstTxLocation = metadata.lastUpdate;
+ Location lastUpdate = metadata.lastUpdate;
metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
- metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+ Location[] inProgressTxRange = getInProgressTxLocationRange();
+ metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush();
@@ -1399,7 +1407,11 @@ public abstract class MessageDatabase ex
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
if (LOG.isTraceEnabled()) {
- LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
+ LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
+ }
+
+ if (lastUpdate != null) {
+ gcCandidateSet.remove(lastUpdate.getDataFileId());
}
// Don't GC files under replication
@@ -1411,25 +1423,14 @@ public abstract class MessageDatabase ex
gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
}
- // Don't GC files after the first in progress tx
- if( metadata.firstInProgressTransactionLocation!=null ) {
- if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
- firstTxLocation = metadata.firstInProgressTransactionLocation;
+ // Don't GC files referenced by in-progress tx
+ if (inProgressTxRange[0] != null) {
+ for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
+ gcCandidateSet.remove(pendingTx);
}
}
-
- if( firstTxLocation!=null ) {
- while( !gcCandidateSet.isEmpty() ) {
- Integer last = gcCandidateSet.last();
- if( last >= firstTxLocation.getDataFileId() ) {
- gcCandidateSet.remove(last);
- } else {
- break;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
}
// Go through all the destinations to see if any of them can remove GC candidates.