You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/12/12 14:56:34 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4934 - jdbc persistence adapter traped messages

Updated Branches:
  refs/heads/trunk e1e8c5b08 -> dcedd9fe9


https://issues.apache.org/jira/browse/AMQ-4934 - jdbc persistence adapter traped messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dcedd9fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dcedd9fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dcedd9fe

Branch: refs/heads/trunk
Commit: dcedd9fe96a208d9465c73c370821525e148d398
Parents: e1e8c5b
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Dec 12 14:54:48 2013 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Dec 12 14:55:19 2013 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  29 +-
 .../util/DefaultIOExceptionHandler.java         |  15 +
 .../bugs/TrapMessageInJDBCStoreTest.java        | 306 +++++++++++++++++++
 3 files changed, 342 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 31b60b8..29b65b2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -109,6 +109,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     private final AtomicLong pendingWakeups = new AtomicLong();
     private boolean allConsumersExclusiveByDefault = false;
 
+    private boolean resetNeeded;
+
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         @Override
         public void run() {
@@ -872,14 +874,21 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         sendLock.lockInterruptibly();
         try {
             if (store != null && message.isPersistent()) {
-                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                if (messages.isCacheEnabled()) {
-                    result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
-                } else {
-                    store.addMessage(context, message);
-                }
-                if (isReduceMemoryFootprint()) {
-                    message.clearMarshalledState();
+                try {
+                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+                    if (messages.isCacheEnabled()) {
+                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
+                    } else {
+                        store.addMessage(context, message);
+                    }
+                    if (isReduceMemoryFootprint()) {
+                        message.clearMarshalledState();
+                    }
+                } catch (Exception e) {
+                    // we may have a store in inconsistent state, so reset the cursor
+                    // before restarting normal broker operations
+                    resetNeeded = true;
+                    throw e;
                 }
             }
             // did a transaction commit beat us to the index?
@@ -1115,6 +1124,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         return allConsumersExclusiveByDefault;
     }
 
+    public boolean isResetNeeded() {
+        return resetNeeded;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
     private QueueMessageReference createMessageReference(Message message) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index c65ec65..cf47f23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -18,11 +18,16 @@ package org.apache.activemq.util;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.SuppressReplyException;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +100,16 @@ import org.slf4j.LoggerFactory;
                                             TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
                                         }
                                         if (hasLockOwnership()) {
+                                            Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
+                                            for (Destination destination : destinations.values()) {
+
+                                                if (destination instanceof Queue) {
+                                                    Queue queue = (Queue)destination;
+                                                    if (queue.isResetNeeded()) {
+                                                        queue.clearPendingMessages();
+                                                    }
+                                                }
+                                            }
                                             broker.startAllConnectors();
                                             LOG.info("Successfully restarted transports on " + broker);
                                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
new file mode 100644
index 0000000..13bd755
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.jdbc.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.Message;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test to demostrate a message trapped in the JDBC store and not
+ * delivered to consumer
+ *
+ * The test throws issues the commit to the DB but throws
+ * an exception back to the broker. This scenario could happen when a network
+ * cable is disconnected - message is committed to DB but broker does not know.
+ *
+ *
+ */
+
+public class TrapMessageInJDBCStoreTest extends TestCase {
+
+    private static final String MY_TEST_Q = "MY_TEST_Q";
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TrapMessageInJDBCStoreTest.class);
+    private String transportUrl = "tcp://127.0.0.1:0";
+    private BrokerService broker;
+    private TestTransactionContext testTransactionContext;
+    private TestJDBCPersistenceAdapter jdbc;
+
+    protected BrokerService createBroker(boolean withJMX) throws Exception {
+        BrokerService broker = new BrokerService();
+
+        broker.setUseJmx(withJMX);
+
+        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+        embeddedDataSource.setCreateDatabase("create");
+
+        //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
+        // method that can be configured to throw a SQL exception on demand
+        jdbc = new TestJDBCPersistenceAdapter();
+        jdbc.setDataSource(embeddedDataSource);
+        testTransactionContext = new TestTransactionContext(jdbc);
+
+        jdbc.setLockKeepAlivePeriod(1000l);
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+        jdbc.setLocker(leaseDatabaseLocker);
+
+        broker.setPersistenceAdapter(jdbc);
+
+        broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
+
+        transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
+        return broker;
+    }
+
+    /**
+     *
+     * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $
+     * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception
+     * is thrown
+     *
+     * Excepted correct outcome: receive 3 messages and the DB should contain no messages
+     *
+     * @throws Exception
+     */
+
+    public void testDBCommitException() throws Exception {
+
+        broker = this.createBroker(false);
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+
+        LOG.info("***Broker started...");
+
+        // failover but timeout in 5 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=5000";
+
+
+        sendMessage(MY_TEST_Q, failoverTransportURL);
+
+        List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL);
+
+        //check db contents
+        ArrayList<Long> dbSeq = dbMessageCount();
+
+        LOG.debug("*** db contains message seq " +dbSeq );
+
+        assertEquals("number of messages in DB after test",0,dbSeq.size());
+        assertEquals("number of consumed messages",3,consumedMessages.size());
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+
+
+    public List<TextMessage> consumeMessages(String queue,
+                                      String transportURL) throws JMSException {
+        Connection connection = null;
+        LOG.debug("*** consumeMessages() called ...");
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            connection.start();
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(queue);
+
+            ArrayList<TextMessage> consumedMessages = new ArrayList<TextMessage>();
+
+            MessageConsumer messageConsumer = session.createConsumer(destination);
+
+            while(true){
+                TextMessage textMessage= (TextMessage) messageConsumer.receive(100);
+                LOG.debug("*** consumed Messages :"+textMessage);
+
+                if(textMessage==null){
+                    return consumedMessages;
+                }
+                consumedMessages.add(textMessage);
+            }
+
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    public void sendMessage(String queue, String transportURL)
+            throws JMSException {
+        Connection connection = null;
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(queue);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            TextMessage m = session.createTextMessage("1");
+
+            testTransactionContext.throwSQLException = false;
+            jdbc.throwSQLException = false;
+
+
+            LOG.debug("*** send message 1 to broker...");
+            producer.send(m);
+
+            testTransactionContext.throwSQLException = true;
+            jdbc.throwSQLException = true;
+
+            // trigger SQL exception in transactionContext
+            LOG.debug("***  send message 2 to broker");
+            m.setText("2");
+
+            // need to reset the flag in a seperate thread during the send
+            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+            executor.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    testTransactionContext.throwSQLException = false;
+                    jdbc.throwSQLException = false;
+                }
+            }, 2 , TimeUnit.SECONDS);
+
+             producer.send(m);
+
+
+            LOG.debug("***  send  message 3 to broker");
+            m.setText("3");
+            producer.send(m);
+            LOG.debug("*** Finished sending messages to broker");
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     *  query the DB to see what messages are left in the store
+     * @return
+     * @throws SQLException
+     * @throws IOException
+     */
+    private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
+        java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
+        PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
+
+        try{
+
+            ResultSet result = statement.executeQuery();
+            ArrayList<Long> dbSeq = new ArrayList<Long>();
+
+            while (result.next()){
+                dbSeq.add(result.getLong(1));
+            }
+
+            return dbSeq;
+
+        }finally{
+            statement.close();
+            conn.close();
+
+        }
+
+    }
+
+	/*
+     * Mock classes used for testing
+	 */
+
+    public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
+
+        public boolean throwSQLException;
+
+        public TransactionContext getTransactionContext() throws IOException {
+            return testTransactionContext;
+        }
+
+        @Override
+        public void checkpoint(boolean sync) throws IOException {
+            if (throwSQLException) {
+                throw new IOException("checkpoint failed");
+            }
+            super.checkpoint(sync);
+        }
+    }
+
+    public class TestTransactionContext extends TransactionContext {
+
+        public boolean throwSQLException;
+
+        public TestTransactionContext(
+                JDBCPersistenceAdapter jdbcPersistenceAdapter)
+                throws IOException {
+            super(jdbcPersistenceAdapter);
+        }
+
+        public void executeBatch() throws SQLException {
+            //call
+            super.executeBatch();
+
+            if (throwSQLException){
+                throw new SQLException("TEST SQL EXCEPTION from executeBatch");
+            }
+
+
+        }
+
+
+
+
+    }
+
+}
\ No newline at end of file