You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/12 19:22:59 UTC

svn commit: r1455661 - in /activemq/trunk: activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java

Author: chirino
Date: Tue Mar 12 18:22:59 2013
New Revision: 1455661

URL: http://svn.apache.org/r1455661
Log:
Fixes AMQ4368.  Make sure we don't try to GC the database midway through a store operation.

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
Modified:
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1455661&r1=1455660&r2=1455661&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Mar 12 18:22:59 2013
@@ -85,13 +85,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.Callback;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -231,6 +225,7 @@ public abstract class MessageDatabase ex
     private boolean enableIndexDiskSyncs = true;
     private boolean enableIndexRecoveryFile = true;
     private boolean enableIndexPageCaching = true;
+    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
 
     public MessageDatabase() {
     }
@@ -393,20 +388,15 @@ public abstract class MessageDatabase ex
 
     public void close() throws IOException, InterruptedException {
         if( opened.compareAndSet(true, false)) {
-            this.indexLock.writeLock().lock();
+            checkpointLock.writeLock().lock();
             try {
                 if (metadata.page != null) {
-                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            checkpointUpdate(tx, true);
-                        }
-                    });
+                    checkpointUpdate(true);
                 }
                 pageFile.unload();
                 metadata = new Metadata();
             } finally {
-                this.indexLock.writeLock().unlock();
+                checkpointLock.writeLock().unlock();
             }
             journal.close();
             synchronized (checkpointThreadLock) {
@@ -844,16 +834,10 @@ public abstract class MessageDatabase ex
             if( !opened.get() ) {
                 return;
             }
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, cleanup);
-                }
-            });
         } finally {
             this.indexLock.writeLock().unlock();
         }
-
+        checkpointUpdate(cleanup);
         long end = System.currentTimeMillis();
         if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
             if (LOG.isInfoEnabled()) {
@@ -862,21 +846,6 @@ public abstract class MessageDatabase ex
         }
     }
 
-    public void checkpoint(Callback closure) throws Exception {
-        this.indexLock.writeLock().lock();
-        try {
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, false);
-                }
-            });
-            closure.execute();
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-    }
-
     public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
         int size = data.serializedSizeFramed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -912,17 +881,26 @@ public abstract class MessageDatabase ex
         }
         try {
             ByteSequence sequence = toByteSequence(data);
-            long start = System.currentTimeMillis();
-            Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
-            long start2 = System.currentTimeMillis();
-            process(data, location, after);
-            long end = System.currentTimeMillis();
-            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+
+            Location location;
+            checkpointLock.readLock().lock();
+            try {
+
+                long start = System.currentTimeMillis();
+                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
+                long start2 = System.currentTimeMillis();
+                process(data, location, after);
+
+                long end = System.currentTimeMillis();
+                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+                    }
                 }
-            }
 
+            } finally{
+                checkpointLock.readLock().unlock();
+            }
             if (after != null) {
                 Runnable afterCompletion = null;
                 synchronized (orderedTransactionAfters) {
@@ -1384,6 +1362,26 @@ public abstract class MessageDatabase ex
         }
     }
 
+    private void checkpointUpdate(final boolean cleanup) throws IOException {
+        checkpointLock.writeLock().lock();
+        try {
+            this.indexLock.writeLock().lock();
+            try {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
+                    public void execute(Transaction tx) throws IOException {
+                        checkpointUpdate(tx, cleanup);
+                    }
+                });
+            } finally {
+                this.indexLock.writeLock().unlock();
+            }
+
+        } finally {
+            checkpointLock.writeLock().unlock();
+        }
+    }
+
     /**
      * @param tx
      * @throws IOException

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java?rev=1455661&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java Tue Mar 12 18:22:59 2013
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertTrue;
+
+public class AMQ4368Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private final Destination destination = new ActiveMQQueue("large_message_queue");
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setUseCache(false);
+        broker.setDestinationPolicy(new PolicyMap());
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setChecksumJournalFiles(true);
+        kahadb.setCheckForCorruptJournalFiles(true);
+        kahadb.setCleanupInterval(1000);
+
+        kahadb.deleteAllMessages();
+        broker.setPersistenceAdapter(kahadb);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024*1024*100);
+        return broker;
+    }
+
+    abstract class Client implements Runnable   {
+        private final String name;
+        final AtomicBoolean done = new AtomicBoolean();
+        CountDownLatch doneLatch = new CountDownLatch(1);
+        Connection connection;
+        Session session;
+        final AtomicLong size = new AtomicLong();
+
+        Client(String name) {
+            this.name = name;
+        }
+
+        public void start() {
+            LOG.info("Starting: " + name);
+            new Thread(this, name).start();
+        }
+
+        public void stopAsync() {
+            done.set(true);
+        }
+
+        public void stop() throws InterruptedException {
+            stopAsync();
+            if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) {
+                try {
+                    connection.close();
+                    doneLatch.await();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                connection = createConnection();
+                connection.start();
+                try {
+                    session = createSession();
+                    work();
+                } finally {
+                    try {
+                        connection.close();
+                    } catch (JMSException ignore) {
+                    }
+                    LOG.info("Stopped: " + name);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                done.set(true);
+            } finally {
+                doneLatch.countDown();
+            }
+        }
+
+        protected Session createSession() throws JMSException {
+            return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        protected Connection createConnection() throws JMSException {
+            return connectionFactory.createConnection();
+        }
+
+        abstract protected void work() throws Exception;
+    }
+
+    class ProducingClient extends Client {
+
+        ProducingClient(String name) {
+            super(name);
+        }
+
+        private String createMessage() {
+            StringBuffer stringBuffer = new StringBuffer();
+            for (long i = 0; i < 1000000; i++) {
+                stringBuffer.append("1234567890");
+            }
+            return stringBuffer.toString();
+        }
+
+        @Override
+        protected void work() throws Exception {
+            String data = createMessage();
+            MessageProducer producer = session.createProducer(destination);
+            while (!done.get()) {
+                producer.send(session.createTextMessage(data));
+                long i = size.incrementAndGet();
+                if ((i % 1000) == 0) {
+                    LOG.info("produced " + i + ".");
+                }
+            }
+        }
+    }
+
+    class ConsumingClient extends Client {
+
+        public ConsumingClient(String name) {
+            super(name);
+        }
+
+        @Override
+        protected void work() throws Exception {
+            MessageConsumer consumer = session.createConsumer(destination);
+            while (!done.get()) {
+                Message msg = consumer.receive(100);
+                if (msg != null) {
+                    size.incrementAndGet();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testENTMQ220() throws InterruptedException, JMSException {
+        LOG.info("Start test.");
+
+        ProducingClient producer1 = new ProducingClient("1");
+        ProducingClient producer2 = new ProducingClient("2");
+        ConsumingClient listener1 = new ConsumingClient("subscriber-1");
+        try {
+
+            producer1.start();
+            producer2.start();
+            listener1.start();
+
+            long lastSize = listener1.size.get();
+            for (int i = 0; i < 10; i++) {
+                Thread.sleep(2000);
+                long size = listener1.size.get();
+                LOG.info("Listener 1: consumed: " + (size - lastSize));
+                assertTrue("No messages received on iteration: " + i, size > lastSize);
+                lastSize = size;
+            }
+        } finally {
+            LOG.info("Stopping clients");
+            producer1.stop();
+            producer2.stop();
+            listener1.stop();
+        }
+    }
+}
\ No newline at end of file