You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/07/25 12:33:51 UTC

activemq git commit: AMQ-6372 - fix up accessor pool cleanupUnused and use the minimum number of open files for recovery

Repository: activemq
Updated Branches:
  refs/heads/master 6cc2c1190 -> d427952b1


AMQ-6372 - fix up accessor pool cleanupUnused and use the minimum number of open files for recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d427952b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d427952b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d427952b

Branch: refs/heads/master
Commit: d427952b19f1763d5128d8a6c145d61dcefd79d9
Parents: 6cc2c11
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 25 13:33:24 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Jul 25 13:33:24 2016 +0100

----------------------------------------------------------------------
 activemq-kahadb-store/pom.xml                   |  10 +
 .../activemq/store/kahadb/MessageDatabase.java  |  12 +-
 .../kahadb/MultiKahaDBTransactionStore.java     |   2 +-
 .../disk/journal/DataFileAccessorPool.java      |  16 +-
 .../store/kahadb/disk/journal/Journal.java      |   6 +-
 .../store/kahadb/JournalFdRecoveryTest.java     | 205 +++++++++++++++++++
 .../disk/journal/DataFileAccessorPoolTest.java  |  61 ++++++
 7 files changed, 295 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml
index 3d3bdbb..e1a993f 100755
--- a/activemq-kahadb-store/pom.xml
+++ b/activemq-kahadb-store/pom.xml
@@ -140,6 +140,16 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock-legacy</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index c4d51af..365de7d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -646,6 +646,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
             if (recoveryPosition != null) {
                 int redoCounter = 0;
+                int dataFileRotationTracker = recoveryPosition.getDataFileId();
                 LOG.info("Recovering from the journal @" + recoveryPosition);
                 while (recoveryPosition != null) {
                     try {
@@ -663,9 +664,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                         }
                     }
                     recoveryPosition = journal.getNextLocation(recoveryPosition);
-                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
-                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
-                     }
+                    // hold on to the minimum number of open files during recovery
+                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
+                        dataFileRotationTracker = recoveryPosition.getDataFileId();
+                        journal.cleanup();
+                    }
+                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
+                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
+                    }
                 }
                 if (LOG.isInfoEnabled()) {
                     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 90b7c4d..21d00c0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -275,7 +275,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         if (started.compareAndSet(false, true)) {
             journal = new Journal() {
                 @Override
-                protected void cleanup() {
+                public void cleanup() {
                     super.cleanup();
                     txStoreCleanup();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
index a04abda..d66c4b8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
@@ -39,7 +39,6 @@ public class DataFileAccessorPool {
 
         private final DataFile file;
         private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
-        private boolean used;
         private int openCounter;
         private boolean disposed;
 
@@ -54,7 +53,6 @@ public class DataFileAccessorPool {
             } else {
                 rc = pool.remove(pool.size() - 1);
             }
-            used = true;
             openCounter++;
             return rc;
         }
@@ -68,12 +66,8 @@ public class DataFileAccessorPool {
             }
         }
 
-        public synchronized void clearUsedMark() {
-            used = false;
-        }
-
         public synchronized boolean isUsed() {
-            return used;
+            return openCounter > 0;
         }
 
         public synchronized void dispose() {
@@ -94,13 +88,11 @@ public class DataFileAccessorPool {
         this.journal = dataManager;
     }
 
-    synchronized void clearUsedMark() {
-        for (Pool pool : pools.values()) {
-            pool.clearUsedMark();
-        }
+    public synchronized int size() {
+        return pools.size();
     }
 
-    synchronized void disposeUnused() {
+    public synchronized void disposeUnused() {
         for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
             Pool pool = iter.next();
             if (!pool.isUsed()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index ec1d91f..0b05c56 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -98,6 +98,10 @@ public class Journal {
         }
     }
 
+    public DataFileAccessorPool getAccessorPool() {
+        return accessorPool;
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -674,7 +678,7 @@ public class Journal {
         }
     }
 
-    protected synchronized void cleanup() {
+    public synchronized void cleanup() {
         if (accessorPool != null) {
             accessorPool.disposeUnused();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
new file mode 100644
index 0000000..308b18b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.Attribute;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JournalFdRecoveryTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
+
+    private final String KAHADB_DIRECTORY = "target/activemq-data/";
+    private final String payload = new String(new byte[1024]);
+
+    private ActiveMQConnectionFactory cf = null;
+    private BrokerService broker = null;
+    private final Destination destination = new ActiveMQQueue("Test");
+    private String connectionUri;
+    private KahaDBPersistenceAdapter adapter;
+
+    public byte fill = Byte.valueOf("3");
+
+    protected void startBroker() throws Exception {
+        doStartBroker(true);
+    }
+
+    protected void restartBroker() throws Exception {
+        File dataDir = broker.getPersistenceAdapter().getDirectory();
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        whackIndex(dataDir);
+
+        doStartBroker(false);
+    }
+
+    private void doStartBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDataDirectory(KAHADB_DIRECTORY);
+        broker.addConnector("tcp://localhost:0");
+
+        configurePersistence(broker);
+
+        connectionUri = "vm://localhost?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService) throws Exception {
+        adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(5000);
+        adapter.setCleanupInterval(5000);
+
+        adapter.setCheckForCorruptJournalFiles(true);
+        adapter.setIgnoreMissingJournalfiles(true);
+
+        adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+
+    @Test
+    public void testRecoveryAfterCorruption() throws Exception {
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+        LOG.info("Num journal files: " + numFiles);
+
+        assertTrue("more than x files: " + numFiles, numFiles > 4);
+
+        File dataDir = broker.getPersistenceAdapter().getDirectory();
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        long afterStop = totalOpenFileDescriptorCount(broker);
+        whackIndex(dataDir);
+
+        LOG.info("Num Open files with broker stopped: " + afterStop);
+
+        doStartBroker(false);
+
+        LOG.info("Journal read pool: " + adapter.getStore().getJournal().getAccessorPool().size());
+
+        assertEquals("one entry in the pool on start", 1, adapter.getStore().getJournal().getAccessorPool().size());
+
+        long afterRecovery = totalOpenFileDescriptorCount(broker);
+        LOG.info("Num Open files with broker recovered: " + afterRecovery);
+
+    }
+
+    private long totalOpenFileDescriptorCount(BrokerService broker) {
+        long result = 0;
+        try {
+            javax.management.AttributeList list = broker.getManagementContext().getMBeanServer().getAttributes(new ObjectName("java.lang:type=OperatingSystem"), new String[]{"OpenFileDescriptorCount"});
+            if (!list.isEmpty()) {
+                result = ((Long) ((Attribute) list.get(0)).getValue());
+            }
+        } catch (Exception ignored) {
+        }
+
+        return result;
+    }
+
+    private void whackIndex(File dataDir) {
+        File indexToDelete = new File(dataDir, "db.data");
+        LOG.info("Whacking index: " + indexToDelete);
+        indexToDelete.delete();
+    }
+
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+        return reality;
+    }
+
+    private int produceMessages(Destination destination, int numToSend) throws Exception {
+        int sent = 0;
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < numToSend; i++) {
+                producer.send(createMessage(session, i));
+                sent++;
+            }
+        } finally {
+            connection.close();
+        }
+
+        return sent;
+    }
+
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+        return produceMessages(destination, numToSend);
+    }
+
+    private Message createMessage(Session session, int i) throws Exception {
+        return session.createTextMessage(payload + "::" + i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
new file mode 100644
index 0000000..bb8f16b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class DataFileAccessorPoolTest {
+    private Mockery context;
+
+    @Before
+    public void setUp() throws Exception {
+        context = new Mockery() {
+            {
+                setImposteriser(ClassImposteriser.INSTANCE);
+            }
+        };
+    }
+
+    @Test
+    public void disposeUnused() throws Exception {
+
+        final Journal journal = context.mock(Journal.class);
+
+        DataFileAccessorPool underTest = new DataFileAccessorPool(journal);
+
+        context.checking(new Expectations(){{exactly(1).of(journal).getInflightWrites();}});
+
+        DataFile dataFile = new DataFile(new File("aa"), 1);
+        underTest.closeDataFileAccessor(underTest.openDataFileAccessor(dataFile));
+
+        assertEquals("one in the pool", 1, underTest.size());
+        underTest.disposeUnused();
+
+        assertEquals("0 in the pool", 0, underTest.size());
+
+        context.assertIsSatisfied();
+    }
+
+}
\ No newline at end of file