You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/12/12 14:56:34 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4934 - jdbc
persistence adapter traped messages
Updated Branches:
refs/heads/trunk e1e8c5b08 -> dcedd9fe9
https://issues.apache.org/jira/browse/AMQ-4934 - jdbc persistence adapter traped messages
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dcedd9fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dcedd9fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dcedd9fe
Branch: refs/heads/trunk
Commit: dcedd9fe96a208d9465c73c370821525e148d398
Parents: e1e8c5b
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Dec 12 14:54:48 2013 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Dec 12 14:55:19 2013 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 29 +-
.../util/DefaultIOExceptionHandler.java | 15 +
.../bugs/TrapMessageInJDBCStoreTest.java | 306 +++++++++++++++++++
3 files changed, 342 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 31b60b8..29b65b2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -109,6 +109,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final AtomicLong pendingWakeups = new AtomicLong();
private boolean allConsumersExclusiveByDefault = false;
+ private boolean resetNeeded;
+
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
public void run() {
@@ -872,14 +874,21 @@ public class Queue extends BaseDestination implements Task, UsageListener {
sendLock.lockInterruptibly();
try {
if (store != null && message.isPersistent()) {
- message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
- if (messages.isCacheEnabled()) {
- result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
- } else {
- store.addMessage(context, message);
- }
- if (isReduceMemoryFootprint()) {
- message.clearMarshalledState();
+ try {
+ message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+ if (messages.isCacheEnabled()) {
+ result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
+ } else {
+ store.addMessage(context, message);
+ }
+ if (isReduceMemoryFootprint()) {
+ message.clearMarshalledState();
+ }
+ } catch (Exception e) {
+ // we may have a store in inconsistent state, so reset the cursor
+ // before restarting normal broker operations
+ resetNeeded = true;
+ throw e;
}
}
// did a transaction commit beat us to the index?
@@ -1115,6 +1124,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return allConsumersExclusiveByDefault;
}
+ public boolean isResetNeeded() {
+ return resetNeeded;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
private QueueMessageReference createMessageReference(Message message) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index c65ec65..cf47f23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -18,11 +18,16 @@ package org.apache.activemq.util;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SuppressReplyException;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,6 +100,16 @@ import org.slf4j.LoggerFactory;
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
if (hasLockOwnership()) {
+ Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
+ for (Destination destination : destinations.values()) {
+
+ if (destination instanceof Queue) {
+ Queue queue = (Queue)destination;
+ if (queue.isResetNeeded()) {
+ queue.clearPendingMessages();
+ }
+ }
+ }
broker.startAllConnectors();
LOG.info("Successfully restarted transports on " + broker);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/dcedd9fe/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
new file mode 100644
index 0000000..13bd755
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.jdbc.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.Message;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test to demostrate a message trapped in the JDBC store and not
+ * delivered to consumer
+ *
+ * The test throws issues the commit to the DB but throws
+ * an exception back to the broker. This scenario could happen when a network
+ * cable is disconnected - message is committed to DB but broker does not know.
+ *
+ *
+ */
+
+public class TrapMessageInJDBCStoreTest extends TestCase {
+
+ private static final String MY_TEST_Q = "MY_TEST_Q";
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TrapMessageInJDBCStoreTest.class);
+ private String transportUrl = "tcp://127.0.0.1:0";
+ private BrokerService broker;
+ private TestTransactionContext testTransactionContext;
+ private TestJDBCPersistenceAdapter jdbc;
+
+ protected BrokerService createBroker(boolean withJMX) throws Exception {
+ BrokerService broker = new BrokerService();
+
+ broker.setUseJmx(withJMX);
+
+ EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+ embeddedDataSource.setCreateDatabase("create");
+
+ //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
+ // method that can be configured to throw a SQL exception on demand
+ jdbc = new TestJDBCPersistenceAdapter();
+ jdbc.setDataSource(embeddedDataSource);
+ testTransactionContext = new TestTransactionContext(jdbc);
+
+ jdbc.setLockKeepAlivePeriod(1000l);
+ LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+ leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+ jdbc.setLocker(leaseDatabaseLocker);
+
+ broker.setPersistenceAdapter(jdbc);
+
+ broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
+
+ transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
+ return broker;
+ }
+
+ /**
+ *
+ * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $
+ * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception
+ * is thrown
+ *
+ * Excepted correct outcome: receive 3 messages and the DB should contain no messages
+ *
+ * @throws Exception
+ */
+
+ public void testDBCommitException() throws Exception {
+
+ broker = this.createBroker(false);
+ broker.deleteAllMessages();
+ broker.start();
+ broker.waitUntilStarted();
+
+ LOG.info("***Broker started...");
+
+ // failover but timeout in 5 seconds so the test does not hang
+ String failoverTransportURL = "failover:(" + transportUrl
+ + ")?timeout=5000";
+
+
+ sendMessage(MY_TEST_Q, failoverTransportURL);
+
+ List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL);
+
+ //check db contents
+ ArrayList<Long> dbSeq = dbMessageCount();
+
+ LOG.debug("*** db contains message seq " +dbSeq );
+
+ assertEquals("number of messages in DB after test",0,dbSeq.size());
+ assertEquals("number of consumed messages",3,consumedMessages.size());
+
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
+
+ public List<TextMessage> consumeMessages(String queue,
+ String transportURL) throws JMSException {
+ Connection connection = null;
+ LOG.debug("*** consumeMessages() called ...");
+
+ try {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ transportURL);
+
+ connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(queue);
+
+ ArrayList<TextMessage> consumedMessages = new ArrayList<TextMessage>();
+
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ while(true){
+ TextMessage textMessage= (TextMessage) messageConsumer.receive(100);
+ LOG.debug("*** consumed Messages :"+textMessage);
+
+ if(textMessage==null){
+ return consumedMessages;
+ }
+ consumedMessages.add(textMessage);
+ }
+
+
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ public void sendMessage(String queue, String transportURL)
+ throws JMSException {
+ Connection connection = null;
+
+ try {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ transportURL);
+
+ connection = factory.createConnection();
+ Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(queue);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage m = session.createTextMessage("1");
+
+ testTransactionContext.throwSQLException = false;
+ jdbc.throwSQLException = false;
+
+
+ LOG.debug("*** send message 1 to broker...");
+ producer.send(m);
+
+ testTransactionContext.throwSQLException = true;
+ jdbc.throwSQLException = true;
+
+ // trigger SQL exception in transactionContext
+ LOG.debug("*** send message 2 to broker");
+ m.setText("2");
+
+ // need to reset the flag in a seperate thread during the send
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ testTransactionContext.throwSQLException = false;
+ jdbc.throwSQLException = false;
+ }
+ }, 2 , TimeUnit.SECONDS);
+
+ producer.send(m);
+
+
+ LOG.debug("*** send message 3 to broker");
+ m.setText("3");
+ producer.send(m);
+ LOG.debug("*** Finished sending messages to broker");
+
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ /**
+ * query the DB to see what messages are left in the store
+ * @return
+ * @throws SQLException
+ * @throws IOException
+ */
+ private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
+ java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
+ PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
+
+ try{
+
+ ResultSet result = statement.executeQuery();
+ ArrayList<Long> dbSeq = new ArrayList<Long>();
+
+ while (result.next()){
+ dbSeq.add(result.getLong(1));
+ }
+
+ return dbSeq;
+
+ }finally{
+ statement.close();
+ conn.close();
+
+ }
+
+ }
+
+ /*
+ * Mock classes used for testing
+ */
+
+ public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
+
+ public boolean throwSQLException;
+
+ public TransactionContext getTransactionContext() throws IOException {
+ return testTransactionContext;
+ }
+
+ @Override
+ public void checkpoint(boolean sync) throws IOException {
+ if (throwSQLException) {
+ throw new IOException("checkpoint failed");
+ }
+ super.checkpoint(sync);
+ }
+ }
+
+ public class TestTransactionContext extends TransactionContext {
+
+ public boolean throwSQLException;
+
+ public TestTransactionContext(
+ JDBCPersistenceAdapter jdbcPersistenceAdapter)
+ throws IOException {
+ super(jdbcPersistenceAdapter);
+ }
+
+ public void executeBatch() throws SQLException {
+ //call
+ super.executeBatch();
+
+ if (throwSQLException){
+ throw new SQLException("TEST SQL EXCEPTION from executeBatch");
+ }
+
+
+ }
+
+
+
+
+ }
+
+}
\ No newline at end of file