You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/12 19:22:59 UTC
svn commit: r1455661 - in /activemq/trunk:
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
Author: chirino
Date: Tue Mar 12 18:22:59 2013
New Revision: 1455661
URL: http://svn.apache.org/r1455661
Log:
Fixes AMQ4368. Make sure we don't try to GC the database midway through a store operation.
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1455661&r1=1455660&r2=1455661&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Mar 12 18:22:59 2013
@@ -85,13 +85,7 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.Callback;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,6 +225,7 @@ public abstract class MessageDatabase ex
private boolean enableIndexDiskSyncs = true;
private boolean enableIndexRecoveryFile = true;
private boolean enableIndexPageCaching = true;
+ ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
public MessageDatabase() {
}
@@ -393,20 +388,15 @@ public abstract class MessageDatabase ex
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
- this.indexLock.writeLock().lock();
+ checkpointLock.writeLock().lock();
try {
if (metadata.page != null) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, true);
- }
- });
+ checkpointUpdate(true);
}
pageFile.unload();
metadata = new Metadata();
} finally {
- this.indexLock.writeLock().unlock();
+ checkpointLock.writeLock().unlock();
}
journal.close();
synchronized (checkpointThreadLock) {
@@ -844,16 +834,10 @@ public abstract class MessageDatabase ex
if( !opened.get() ) {
return;
}
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, cleanup);
- }
- });
} finally {
this.indexLock.writeLock().unlock();
}
-
+ checkpointUpdate(cleanup);
long end = System.currentTimeMillis();
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
if (LOG.isInfoEnabled()) {
@@ -862,21 +846,6 @@ public abstract class MessageDatabase ex
}
}
- public void checkpoint(Callback closure) throws Exception {
- this.indexLock.writeLock().lock();
- try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, false);
- }
- });
- closure.execute();
- } finally {
- this.indexLock.writeLock().unlock();
- }
- }
-
public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -912,17 +881,26 @@ public abstract class MessageDatabase ex
}
try {
ByteSequence sequence = toByteSequence(data);
- long start = System.currentTimeMillis();
- Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
- long start2 = System.currentTimeMillis();
- process(data, location, after);
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+
+ Location location;
+ checkpointLock.readLock().lock();
+ try {
+
+ long start = System.currentTimeMillis();
+ location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
+ long start2 = System.currentTimeMillis();
+ process(data, location, after);
+
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ }
}
- }
+ } finally{
+ checkpointLock.readLock().unlock();
+ }
if (after != null) {
Runnable afterCompletion = null;
synchronized (orderedTransactionAfters) {
@@ -1384,6 +1362,26 @@ public abstract class MessageDatabase ex
}
}
+ private void checkpointUpdate(final boolean cleanup) throws IOException {
+ checkpointLock.writeLock().lock();
+ try {
+ this.indexLock.writeLock().lock();
+ try {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, cleanup);
+ }
+ });
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+
+ } finally {
+ checkpointLock.writeLock().unlock();
+ }
+ }
+
/**
* @param tx
* @throws IOException
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java?rev=1455661&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java Tue Mar 12 18:22:59 2013
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertTrue;
+
+public class AMQ4368Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory connectionFactory;
+ private final Destination destination = new ActiveMQQueue("large_message_queue");
+ private String connectionUri;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = createBroker();
+ connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+ broker.start();
+ connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setUseCache(false);
+ broker.setDestinationPolicy(new PolicyMap());
+ broker.getDestinationPolicy().setDefaultEntry(policy);
+
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setChecksumJournalFiles(true);
+ kahadb.setCheckForCorruptJournalFiles(true);
+ kahadb.setCleanupInterval(1000);
+
+ kahadb.deleteAllMessages();
+ broker.setPersistenceAdapter(kahadb);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024*1024*100);
+ return broker;
+ }
+
+ abstract class Client implements Runnable {
+ private final String name;
+ final AtomicBoolean done = new AtomicBoolean();
+ CountDownLatch doneLatch = new CountDownLatch(1);
+ Connection connection;
+ Session session;
+ final AtomicLong size = new AtomicLong();
+
+ Client(String name) {
+ this.name = name;
+ }
+
+ public void start() {
+ LOG.info("Starting: " + name);
+ new Thread(this, name).start();
+ }
+
+ public void stopAsync() {
+ done.set(true);
+ }
+
+ public void stop() throws InterruptedException {
+ stopAsync();
+ if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) {
+ try {
+ connection.close();
+ doneLatch.await();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ connection = createConnection();
+ connection.start();
+ try {
+ session = createSession();
+ work();
+ } finally {
+ try {
+ connection.close();
+ } catch (JMSException ignore) {
+ }
+ LOG.info("Stopped: " + name);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ done.set(true);
+ } finally {
+ doneLatch.countDown();
+ }
+ }
+
+ protected Session createSession() throws JMSException {
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected Connection createConnection() throws JMSException {
+ return connectionFactory.createConnection();
+ }
+
+ abstract protected void work() throws Exception;
+ }
+
+ class ProducingClient extends Client {
+
+ ProducingClient(String name) {
+ super(name);
+ }
+
+ private String createMessage() {
+ StringBuffer stringBuffer = new StringBuffer();
+ for (long i = 0; i < 1000000; i++) {
+ stringBuffer.append("1234567890");
+ }
+ return stringBuffer.toString();
+ }
+
+ @Override
+ protected void work() throws Exception {
+ String data = createMessage();
+ MessageProducer producer = session.createProducer(destination);
+ while (!done.get()) {
+ producer.send(session.createTextMessage(data));
+ long i = size.incrementAndGet();
+ if ((i % 1000) == 0) {
+ LOG.info("produced " + i + ".");
+ }
+ }
+ }
+ }
+
+ class ConsumingClient extends Client {
+
+ public ConsumingClient(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void work() throws Exception {
+ MessageConsumer consumer = session.createConsumer(destination);
+ while (!done.get()) {
+ Message msg = consumer.receive(100);
+ if (msg != null) {
+ size.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testENTMQ220() throws InterruptedException, JMSException {
+ LOG.info("Start test.");
+
+ ProducingClient producer1 = new ProducingClient("1");
+ ProducingClient producer2 = new ProducingClient("2");
+ ConsumingClient listener1 = new ConsumingClient("subscriber-1");
+ try {
+
+ producer1.start();
+ producer2.start();
+ listener1.start();
+
+ long lastSize = listener1.size.get();
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(2000);
+ long size = listener1.size.get();
+ LOG.info("Listener 1: consumed: " + (size - lastSize));
+ assertTrue("No messages received on iteration: " + i, size > lastSize);
+ lastSize = size;
+ }
+ } finally {
+ LOG.info("Stopping clients");
+ producer1.stop();
+ producer2.stop();
+ listener1.stop();
+ }
+ }
+}
\ No newline at end of file