You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2013/11/01 12:45:44 UTC

git commit: AMQ-4841: Fixed JDBC leased locker to allow being configured in any order in the XML file, being able to use the defined statemnts. Otherwise you would have had to define the locker last in the XML file. Thanks to Pat Fox for the test case.

Updated Branches:
  refs/heads/trunk 855419359 -> 8a8fcb6ef


AMQ-4841: Fixed JDBC leased locker to allow being configured in any order in the XML file, being able to use the defined statemnts. Otherwise you would have had to define the locker last in the XML file. Thanks to Pat Fox for the test case.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8a8fcb6e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8a8fcb6e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8a8fcb6e

Branch: refs/heads/trunk
Commit: 8a8fcb6ef4a8e7c5a35e03a8ea043e0984ef2cf8
Parents: 8554193
Author: Claus Ibsen <cl...@gmail.com>
Authored: Fri Nov 1 12:46:36 2013 +0100
Committer: Claus Ibsen <cl...@gmail.com>
Committed: Fri Nov 1 12:46:36 2013 +0100

----------------------------------------------------------------------
 .../activemq/store/jdbc/AbstractJDBCLocker.java | 16 +++++-
 .../store/jdbc/DefaultDatabaseLocker.java       |  4 +-
 .../store/jdbc/LeaseDatabaseLocker.java         | 10 ++--
 .../jdbc/adapter/TransactDatabaseLocker.java    |  2 +-
 .../store/jdbc/JDBCLockTablePrefixTest.java     | 42 ++++++++++++++
 .../activemq/store/jdbc/JDBCLockTablePrefix.xml | 58 ++++++++++++++++++++
 6 files changed, 121 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
index e3cc801..237e250 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java
@@ -29,18 +29,27 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractJDBCLocker extends AbstractLocker {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class);
     protected DataSource dataSource;
-    protected Statements statements;
+    private Statements statements;
+    protected JDBCPersistenceAdapter jdbcAdapter;
 
     protected boolean createTablesOnStartup;
     protected int queryTimeout = -1;
 
     public void configure(PersistenceAdapter adapter) throws IOException {
         if (adapter instanceof JDBCPersistenceAdapter) {
+            this.jdbcAdapter = (JDBCPersistenceAdapter) adapter;
             this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
-            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+            // we cannot get the statements (yet) as they may be configured later
         }
     }
 
+    protected Statements getStatements() {
+        if (statements == null && jdbcAdapter != null) {
+            statements = jdbcAdapter.getStatements();
+        }
+        return statements;
+    }
+
     public void setDataSource(DataSource dataSource) {
         this.dataSource = dataSource;
     }
@@ -94,7 +103,8 @@ public abstract class AbstractJDBCLocker extends AbstractLocker {
     @Override
     public void preStart() {
         if (createTablesOnStartup) {
-            String[] createStatements = this.statements.getCreateLockSchemaStatements();
+
+            String[] createStatements = getStatements().getCreateLockSchemaStatements();
 
             Connection connection = null;
             Statement statement = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
index 64d24ab..d7a406a 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
@@ -44,7 +44,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
     public void doStart() throws Exception {
 
         LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
-        String sql = statements.getLockCreateStatement();
+        String sql = getStatements().getLockCreateStatement();
         LOG.debug("Locking Query is "+sql);
         
         while (true) {
@@ -158,7 +158,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
     public boolean keepAlive() throws IOException {
         boolean result = false;
         try {
-            lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
+            lockUpdateStatement = connection.prepareStatement(getStatements().getLockUpdateStatement());
             lockUpdateStatement.setLong(1, System.currentTimeMillis());
             setQueryTimeout(lockUpdateStatement);
             int rows = lockUpdateStatement.executeUpdate();

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
index 029b1df..39f8cfe 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
@@ -52,7 +52,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
         }
 
         LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
-        String sql = statements.getLeaseObtainStatement();
+        String sql = getStatements().getLeaseObtainStatement();
         LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
 
         long now = 0l;
@@ -101,7 +101,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
     private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
         PreparedStatement statement = null;
         try {
-            statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
+            statement = connection.prepareStatement(getStatements().getLeaseOwnerStatement());
             ResultSet resultSet = statement.executeQuery();
             while (resultSet.next()) {
                 LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
@@ -123,7 +123,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
     }
 
     protected long determineTimeDifference(Connection connection) throws SQLException {
-        PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
+        PreparedStatement statement = connection.prepareStatement(getStatements().getCurrentDateTime());
         ResultSet resultSet = statement.executeQuery();
         long result = 0l;
         if (resultSet.next()) {
@@ -151,7 +151,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
         PreparedStatement statement = null;
         try {
             connection = getConnection();
-            statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
+            statement = connection.prepareStatement(getStatements().getLeaseUpdateStatement());
             statement.setString(1, null);
             statement.setLong(2, 0l);
             statement.setString(3, getLeaseHolderId());
@@ -169,7 +169,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
     @Override
     public boolean keepAlive() throws IOException {
         boolean result = false;
-        final String sql = statements.getLeaseUpdateStatement();
+        final String sql = getStatements().getLeaseUpdateStatement();
         LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
 
         Connection connection = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
index d69239c..8854165 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
@@ -45,7 +45,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
             try {
                 connection = dataSource.getConnection();
                 connection.setAutoCommit(false);
-                String sql = statements.getLockCreateStatement();
+                String sql = getStatements().getLockCreateStatement();
                 statement = connection.prepareStatement(sql);
                 if (statement.getMetaData() != null) {
                     ResultSet rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
new file mode 100644
index 0000000..e5b47ba
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+
+public class JDBCLockTablePrefixTest extends TestCase {
+
+    public void testLockTable() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml");
+        broker.waitUntilStarted();
+
+        PersistenceAdapter pa = broker.getPersistenceAdapter();
+        assertNotNull(pa);
+
+        JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa;
+        assertEquals("TTT_", jpa.getStatements().getTablePrefix());
+        assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName());
+        assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName());
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a8fcb6e/activemq-unit-tests/src/test/resources/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
new file mode 100644
index 0000000..ac70fa7
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:amq="http://activemq.apache.org/schema/core"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
+    <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
+                useLoggingForShutdownErrors="true" useJmx="false"
+                persistent="true" vmConnectorURI="vm://javacoola"
+                useShutdownHook="false" deleteAllMessagesOnStartup="true">
+
+      <amq:persistenceAdapter>
+        <amq:jdbcPersistenceAdapter dataDirectory="target/activemq-data" dataSource="#derby-ds" lockKeepAlivePeriod="5000" createTablesOnStartup="true">
+          <!-- test that we can define the locker before th statements,
+          but the locker will still pickup the statements -->
+          <amq:locker>
+            <amq:lease-database-locker lockAcquireSleepInterval="10000"/>
+          </amq:locker>
+          <amq:statements>
+            <amq:statements tablePrefix="TTT_" messageTableName="AMQ_MSGS2" durableSubAcksTableName="AMQ_ACKS2" lockTableName="AMQ_LOCK2"/>
+          </amq:statements>
+          <amq:adapter>
+            <amq:defaultJDBCAdapter/>
+          </amq:adapter>
+        </amq:jdbcPersistenceAdapter>
+      </amq:persistenceAdapter>
+
+        <amq:transportConnectors>
+            <amq:transportConnector uri="vm://brokerConfigTest"/>
+        </amq:transportConnectors>
+
+    </amq:broker>
+
+  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
+    <property name="databaseName" value="target/derbyDb"/>
+    <property name="connectionAttributes" value=";create=true"/>
+  </bean>
+
+</beans>
\ No newline at end of file