You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/07/25 12:33:51 UTC
activemq git commit: AMQ-6372 - fix up accessor pool cleanupUnused
and use the minimum number of open files for recovery
Repository: activemq
Updated Branches:
refs/heads/master 6cc2c1190 -> d427952b1
AMQ-6372 - fix up accessor pool cleanupUnused and use the minimum number of open files for recovery
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d427952b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d427952b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d427952b
Branch: refs/heads/master
Commit: d427952b19f1763d5128d8a6c145d61dcefd79d9
Parents: 6cc2c11
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 25 13:33:24 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Jul 25 13:33:24 2016 +0100
----------------------------------------------------------------------
activemq-kahadb-store/pom.xml | 10 +
.../activemq/store/kahadb/MessageDatabase.java | 12 +-
.../kahadb/MultiKahaDBTransactionStore.java | 2 +-
.../disk/journal/DataFileAccessorPool.java | 16 +-
.../store/kahadb/disk/journal/Journal.java | 6 +-
.../store/kahadb/JournalFdRecoveryTest.java | 205 +++++++++++++++++++
.../disk/journal/DataFileAccessorPoolTest.java | 61 ++++++
7 files changed, 295 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml
index 3d3bdbb..e1a993f 100755
--- a/activemq-kahadb-store/pom.xml
+++ b/activemq-kahadb-store/pom.xml
@@ -140,6 +140,16 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock-legacy</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index c4d51af..365de7d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -646,6 +646,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (recoveryPosition != null) {
int redoCounter = 0;
+ int dataFileRotationTracker = recoveryPosition.getDataFileId();
LOG.info("Recovering from the journal @" + recoveryPosition);
while (recoveryPosition != null) {
try {
@@ -663,9 +664,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
recoveryPosition = journal.getNextLocation(recoveryPosition);
- if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
- LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
- }
+ // hold on to the minimum number of open files during recovery
+ if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
+ dataFileRotationTracker = recoveryPosition.getDataFileId();
+ journal.cleanup();
+ }
+ if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
+ LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
+ }
}
if (LOG.isInfoEnabled()) {
long end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 90b7c4d..21d00c0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -275,7 +275,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
if (started.compareAndSet(false, true)) {
journal = new Journal() {
@Override
- protected void cleanup() {
+ public void cleanup() {
super.cleanup();
txStoreCleanup();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
index a04abda..d66c4b8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPool.java
@@ -39,7 +39,6 @@ public class DataFileAccessorPool {
private final DataFile file;
private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
- private boolean used;
private int openCounter;
private boolean disposed;
@@ -54,7 +53,6 @@ public class DataFileAccessorPool {
} else {
rc = pool.remove(pool.size() - 1);
}
- used = true;
openCounter++;
return rc;
}
@@ -68,12 +66,8 @@ public class DataFileAccessorPool {
}
}
- public synchronized void clearUsedMark() {
- used = false;
- }
-
public synchronized boolean isUsed() {
- return used;
+ return openCounter > 0;
}
public synchronized void dispose() {
@@ -94,13 +88,11 @@ public class DataFileAccessorPool {
this.journal = dataManager;
}
- synchronized void clearUsedMark() {
- for (Pool pool : pools.values()) {
- pool.clearUsedMark();
- }
+ public synchronized int size() {
+ return pools.size();
}
- synchronized void disposeUnused() {
+ public synchronized void disposeUnused() {
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
if (!pool.isUsed()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index ec1d91f..0b05c56 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -98,6 +98,10 @@ public class Journal {
}
}
+ public DataFileAccessorPool getAccessorPool() {
+ return accessorPool;
+ }
+
public enum PreallocationStrategy {
SPARSE_FILE,
OS_KERNEL_COPY,
@@ -674,7 +678,7 @@ public class Journal {
}
}
- protected synchronized void cleanup() {
+ public synchronized void cleanup() {
if (accessorPool != null) {
accessorPool.disposeUnused();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
new file mode 100644
index 0000000..308b18b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.Attribute;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JournalFdRecoveryTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
+
+ private final String KAHADB_DIRECTORY = "target/activemq-data/";
+ private final String payload = new String(new byte[1024]);
+
+ private ActiveMQConnectionFactory cf = null;
+ private BrokerService broker = null;
+ private final Destination destination = new ActiveMQQueue("Test");
+ private String connectionUri;
+ private KahaDBPersistenceAdapter adapter;
+
+ public byte fill = Byte.valueOf("3");
+
+ protected void startBroker() throws Exception {
+ doStartBroker(true);
+ }
+
+ protected void restartBroker() throws Exception {
+ File dataDir = broker.getPersistenceAdapter().getDirectory();
+
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ whackIndex(dataDir);
+
+ doStartBroker(false);
+ }
+
+ private void doStartBroker(boolean delete) throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(delete);
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.setDataDirectory(KAHADB_DIRECTORY);
+ broker.addConnector("tcp://localhost:0");
+
+ configurePersistence(broker);
+
+ connectionUri = "vm://localhost?create=false";
+ cf = new ActiveMQConnectionFactory(connectionUri);
+
+ broker.start();
+ LOG.info("Starting broker..");
+ }
+
+ protected void configurePersistence(BrokerService brokerService) throws Exception {
+ adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+ // ensure there are a bunch of data files but multiple entries in each
+ adapter.setJournalMaxFileLength(1024 * 20);
+
+ // speed up the test case, checkpoint an cleanup early and often
+ adapter.setCheckpointInterval(5000);
+ adapter.setCleanupInterval(5000);
+
+ adapter.setCheckForCorruptJournalFiles(true);
+ adapter.setIgnoreMissingJournalfiles(true);
+
+ adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+
+ @Test
+ public void testRecoveryAfterCorruption() throws Exception {
+ startBroker();
+
+ produceMessagesToConsumeMultipleDataFiles(50);
+
+ int numFiles = getNumberOfJournalFiles();
+ LOG.info("Num journal files: " + numFiles);
+
+ assertTrue("more than x files: " + numFiles, numFiles > 4);
+
+ File dataDir = broker.getPersistenceAdapter().getDirectory();
+
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ long afterStop = totalOpenFileDescriptorCount(broker);
+ whackIndex(dataDir);
+
+ LOG.info("Num Open files with broker stopped: " + afterStop);
+
+ doStartBroker(false);
+
+ LOG.info("Journal read pool: " + adapter.getStore().getJournal().getAccessorPool().size());
+
+ assertEquals("one entry in the pool on start", 1, adapter.getStore().getJournal().getAccessorPool().size());
+
+ long afterRecovery = totalOpenFileDescriptorCount(broker);
+ LOG.info("Num Open files with broker recovered: " + afterRecovery);
+
+ }
+
+ private long totalOpenFileDescriptorCount(BrokerService broker) {
+ long result = 0;
+ try {
+ javax.management.AttributeList list = broker.getManagementContext().getMBeanServer().getAttributes(new ObjectName("java.lang:type=OperatingSystem"), new String[]{"OpenFileDescriptorCount"});
+ if (!list.isEmpty()) {
+ result = ((Long) ((Attribute) list.get(0)).getValue());
+ }
+ } catch (Exception ignored) {
+ }
+
+ return result;
+ }
+
+ private void whackIndex(File dataDir) {
+ File indexToDelete = new File(dataDir, "db.data");
+ LOG.info("Whacking index: " + indexToDelete);
+ indexToDelete.delete();
+ }
+
+ private int getNumberOfJournalFiles() throws IOException {
+ Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ int reality = 0;
+ for (DataFile file : files) {
+ if (file != null) {
+ reality++;
+ }
+ }
+ return reality;
+ }
+
+ private int produceMessages(Destination destination, int numToSend) throws Exception {
+ int sent = 0;
+ Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numToSend; i++) {
+ producer.send(createMessage(session, i));
+ sent++;
+ }
+ } finally {
+ connection.close();
+ }
+
+ return sent;
+ }
+
+ private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+ return produceMessages(destination, numToSend);
+ }
+
+ private Message createMessage(Session session, int i) throws Exception {
+ return session.createTextMessage(payload + "::" + i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d427952b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
new file mode 100644
index 0000000..bb8f16b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorPoolTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class DataFileAccessorPoolTest {
+ private Mockery context;
+
+ @Before
+ public void setUp() throws Exception {
+ context = new Mockery() {
+ {
+ setImposteriser(ClassImposteriser.INSTANCE);
+ }
+ };
+ }
+
+ @Test
+ public void disposeUnused() throws Exception {
+
+ final Journal journal = context.mock(Journal.class);
+
+ DataFileAccessorPool underTest = new DataFileAccessorPool(journal);
+
+ context.checking(new Expectations(){{exactly(1).of(journal).getInflightWrites();}});
+
+ DataFile dataFile = new DataFile(new File("aa"), 1);
+ underTest.closeDataFileAccessor(underTest.openDataFileAccessor(dataFile));
+
+ assertEquals("one in the pool", 1, underTest.size());
+ underTest.disposeUnused();
+
+ assertEquals("0 in the pool", 0, underTest.size());
+
+ context.assertIsSatisfied();
+ }
+
+}
\ No newline at end of file