You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/07/27 10:58:45 UTC

activemq git commit: AMQ-6372 - add IOExceptionHandler to kahadb read path to ensure fast shutdown on disk access errors

Repository: activemq
Updated Branches:
  refs/heads/master 37da75e0e -> 6b8e743b0


AMQ-6372 - add IOExceptionHandler to kahadb read path to ensure fast shutdown on disk access errors


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b8e743b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b8e743b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b8e743b

Branch: refs/heads/master
Commit: 6b8e743b083b9baf201f26b6f7d9a574638ba537
Parents: 37da75e
Author: gtully <ga...@gmail.com>
Authored: Wed Jul 27 11:57:40 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Jul 27 11:58:22 2016 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      | 28 ++++---
 .../JournalCorruptionEofIndexRecoveryTest.java  | 13 +++-
 .../store/kahadb/JournalFdRecoveryTest.java     | 78 +++++++++++++++++++-
 3 files changed, 102 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 9ed7381..0d20b78 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1230,17 +1230,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
      * @throws IOException
      */
     Message loadMessage(Location location) throws IOException {
-        JournalCommand<?> command = load(location);
-        KahaAddMessageCommand addMessage = null;
-        switch (command.type()) {
-            case KAHA_UPDATE_MESSAGE_COMMAND:
-                addMessage = ((KahaUpdateMessageCommand)command).getMessage();
-                break;
-            default:
-                addMessage = (KahaAddMessageCommand) command;
-        }
-        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
-        return msg;
+        try {
+            JournalCommand<?> command = load(location);
+            KahaAddMessageCommand addMessage = null;
+            switch (command.type()) {
+                case KAHA_UPDATE_MESSAGE_COMMAND:
+                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
+                    break;
+                default:
+                    addMessage = (KahaAddMessageCommand) command;
+            }
+            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
+            return msg;
+        } catch (IOException ioe) {
+            LOG.error("Failed to load message at: {}", location , ioe);
+            brokerService.handleIOException(ioe);
+            throw ioe;
+        }
     }
 
     // /////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index faf0022..6cffe3d 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -371,11 +372,15 @@ public class JournalCorruptionEofIndexRecoveryTest {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createConsumer(destination);
         int count = 0;
-        while (count < max && consumer.receive(5000) != null) {
-            count++;
+        try {
+            while (count < max && consumer.receive(5000) != null) {
+                count++;
+            }
+        } catch (JMSException ok) {
+        } finally {
+            consumer.close();
+            connection.close();
         }
-        consumer.close();
-        connection.close();
         return count;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index 308b18b..2d398e2 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
@@ -28,7 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.management.Attribute;
@@ -36,8 +40,11 @@ import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class JournalFdRecoveryTest {
@@ -80,6 +87,12 @@ public class JournalFdRecoveryTest {
         broker.setDataDirectory(KAHADB_DIRECTORY);
         broker.addConnector("tcp://localhost:0");
 
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setUseCache(false);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
         configurePersistence(broker);
 
         connectionUri = "vm://localhost?create=false";
@@ -115,6 +128,40 @@ public class JournalFdRecoveryTest {
 
 
     @Test
+    public void testStopOnPageInIOError() throws Exception {
+        startBroker();
+
+        int sent = produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+        LOG.info("Num journal files: " + numFiles);
+
+        assertTrue("more than x files: " + numFiles, numFiles > 4);
+
+        File dataDir = broker.getPersistenceAdapter().getDirectory();
+
+        for (int i=2;i<4;i++) {
+            whackDataFile(dataDir, i);
+        }
+
+        final CountDownLatch gotShutdown = new CountDownLatch(1);
+        broker.addShutdownHook(new Runnable() {
+            @Override
+            public void run() {
+                gotShutdown.countDown();
+            }
+        });
+
+        int received = tryConsume(destination, sent);
+        assertNotEquals("not all message received", sent, received);
+        assertTrue("broker got shutdown on page in error", gotShutdown.await(5, TimeUnit.SECONDS));
+    }
+
+    private void whackDataFile(File dataDir, int i) throws Exception {
+        whackFile(dataDir, "db-" + i + ".log");
+    }
+
+    @Test
     public void testRecoveryAfterCorruption() throws Exception {
         startBroker();
 
@@ -160,8 +207,12 @@ public class JournalFdRecoveryTest {
         return result;
     }
 
-    private void whackIndex(File dataDir) {
-        File indexToDelete = new File(dataDir, "db.data");
+    private void whackIndex(File dataDir) throws Exception {
+        whackFile(dataDir, "db.data");
+    }
+
+    private void whackFile(File dataDir, String name) throws Exception {
+        File indexToDelete = new File(dataDir, name);
         LOG.info("Whacking index: " + indexToDelete);
         indexToDelete.delete();
     }
@@ -195,6 +246,29 @@ public class JournalFdRecoveryTest {
         return sent;
     }
 
+    private int tryConsume(Destination destination, int numToGet) throws Exception {
+        int got = 0;
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+            for (int i = 0; i < numToGet; i++) {
+                if (consumer.receive(4000) == null) {
+                    // give up on timeout or error
+                    break;
+                }
+                got++;
+
+            }
+        } catch (JMSException ok) {
+        } finally {
+            connection.close();
+        }
+
+        return got;
+    }
+
     private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
         return produceMessages(destination, numToSend);
     }