You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/02/15 14:33:02 UTC

svn commit: r1446577 - in /activemq/trunk: activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java

Author: gtully
Date: Fri Feb 15 13:33:01 2013
New Revision: 1446577

URL: http://svn.apache.org/r1446577
Log:
https://issues.apache.org/jira/browse/AMQ-4323 - fix nd test

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java   (with props)
Modified:
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1446577&r1=1446576&r2=1446577&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb 15 13:33:01 2013
@@ -1420,7 +1420,18 @@ public abstract class MessageDatabase ex
             }
 
             if (metadata.producerSequenceIdTrackerLocation != null) {
-                gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
+                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
+                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
+                    // rewrite so we don't prevent gc
+                    metadata.producerSequenceIdTracker.setModified(true);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
+                    }
+                }
+                gcCandidateSet.remove(dataFileId);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
+                }
             }
 
             // Don't GC files referenced by in-progress tx

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java?rev=1446577&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java Fri Feb 15 13:33:01 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ4323Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class);
+
+    BrokerService broker = null;
+    File kahaDbDir = null;
+    private final Destination destination = new ActiveMQQueue("q");
+    final String payload = new String(new byte[1024]);
+
+    protected void startBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+
+        //Start with a clean directory
+        kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
+        deleteDir(kahaDbDir);
+
+        broker.setSchedulerSupport(false);
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:0");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setUseCache(false);
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+
+        configurePersistence(broker, delete);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(500);
+        adapter.setCleanupInterval(500);
+
+        if (!deleteAllOnStart) {
+            adapter.setForceRecoverIndex(true);
+        }
+
+    }
+
+    private boolean deleteDir(File dir) {
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            for (int i = 0; i < children.length; i++) {
+                boolean success = deleteDir(new File(dir, children[i]));
+                if (!success) {
+                    return false;
+                }
+            }
+        }
+
+        return dir.delete();
+    }
+
+    private int getFileCount(File dir){
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            return children.length;
+        }
+
+        return 0;
+    }
+
+    @Test
+    public void testCleanupOfFiles() throws Exception {
+        final int messageCount = 500;
+        startBroker(true);
+        int fileCount = getFileCount(kahaDbDir);
+        assertEquals(4, fileCount);
+
+        Connection connection = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ProducerThread producer = new ProducerThread(producerSess, destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setMessageCount(messageCount);
+        ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
+        consumer.setBreakOnNull(false);
+        consumer.setMessageCount(messageCount);
+
+        producer.start();
+        producer.join();
+
+        consumer.start();
+        consumer.join();
+
+        assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
+
+        // verify cleanup
+        assertTrue("gc worked", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                int fileCount = getFileCount(kahaDbDir);
+                LOG.info("current filecount:" + fileCount);
+                return 4 == fileCount;
+            }
+        }));
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+    }
+
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date