You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/01 21:33:18 UTC
[3/5] activemq-artemis git commit: Refactored JDBC Sequential File
Factory
Refactored JDBC Sequential File Factory
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/466d43c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/466d43c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/466d43c6
Branch: refs/heads/master
Commit: 466d43c63dc3b61e2be41721f799a15afbf02064
Parents: ae73001
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue May 31 12:15:52 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100
----------------------------------------------------------------------
.../activemq/artemis/jdbc/store/JDBCUtils.java | 24 +-
.../jdbc/store/drivers/AbstractJDBCDriver.java | 88 +++++
.../store/drivers/derby/DerbySQLProvider.java | 59 ++++
.../jdbc/store/file/JDBCFileFactoryDriver.java | 344 +++++++++++++++++++
.../jdbc/store/file/JDBCSequentialFile.java | 172 ++++------
.../store/file/JDBCSequentialFileFactory.java | 97 ++----
.../jdbc/store/file/sql/DerbySQLProvider.java | 52 ---
.../jdbc/store/file/sql/GenericSQLProvider.java | 143 --------
.../jdbc/store/file/sql/SQLProvider.java | 46 ---
.../jdbc/store/journal/JDBCJournalImpl.java | 132 +++----
.../jdbc/store/sql/GenericSQLProvider.java | 201 +++++++++++
.../artemis/jdbc/store/sql/SQLProvider.java | 60 ++++
.../file/JDBCSequentialFileFactoryTest.java | 9 +-
.../impl/journal/JDBCJournalStorageManager.java | 6 +-
.../artemis/tests/util/ActiveMQTestBase.java | 38 +-
.../largemessage/LargeMessageTestBase.java | 7 +
.../RolesConfigurationStorageTest.java | 1 -
.../persistence/StorageManagerTestBase.java | 12 +-
.../integration/xa/BasicXaRecoveryTest.java | 3 +
19 files changed, 973 insertions(+), 521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index bc04ab9..a1bde56 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -23,9 +23,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class JDBCUtils {
@@ -73,4 +74,21 @@ public class JDBCUtils {
return new GenericSQLProvider(tableName);
}
}
+
+ public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName, String jdbcConnectionUrl) throws SQLException {
+ JDBCFileFactoryDriver dbDriver;
+ if (driverClass.contains("derby")) {
+ dbDriver = new JDBCFileFactoryDriver();
+ dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
+ dbDriver.setConnectionURL(jdbcConnectionUrl);
+ dbDriver.setDriverClass(driverClass);
+ }
+ else {
+ dbDriver = new JDBCFileFactoryDriver();
+ dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
+ dbDriver.setConnectionURL(jdbcConnectionUrl);
+ dbDriver.setDriverClass(driverClass);
+ }
+ return dbDriver;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
new file mode 100644
index 0000000..1c282c0
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+/**
+ * Class to hold common database functionality such as drivers and connections
+ */
+public abstract class AbstractJDBCDriver {
+
+ protected Connection connection;
+
+ protected final SQLProvider sqlProvider;
+
+ protected final String jdbcConnectionUrl;
+
+ protected final String jdbcDriverClass;
+
+ protected Driver dbDriver;
+
+ public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
+ this.jdbcConnectionUrl = jdbcConnectionUrl;
+ this.jdbcDriverClass = jdbcDriverClass;
+ this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName);
+ }
+
+ public void start() throws Exception {
+ connect();
+ createSchema();
+ prepareStatements();
+ }
+
+ public void stop() throws Exception {
+ if (sqlProvider.closeConnectionOnShutdown()) {
+ connection.close();
+ }
+ }
+
+ protected abstract void prepareStatements() throws SQLException;
+
+ protected abstract void createSchema() throws SQLException;
+
+ protected void createTable(String schemaSql) throws SQLException {
+ JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
+ }
+
+ protected void connect() throws Exception {
+ try {
+ dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
+ connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
+ }
+ catch (SQLException e) {
+ ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
+ throw new RuntimeException("Error connecting to database", e);
+ }
+ }
+
+ public void destroy() throws Exception {
+ connection.setAutoCommit(false);
+ Statement statement = connection.createStatement();
+ statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
+ statement.close();
+ connection.commit();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
new file mode 100644
index 0000000..d9cbed4
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.derby;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class DerbySQLProvider extends GenericSQLProvider {
+
+ // Derby max blob size = 2G
+ private static final int MAX_BLOB_SIZE = 2147483647;
+
+ private final String createFileTableSQL;
+
+ private final String appendToFileSQL;
+
+ public DerbySQLProvider(String tableName) {
+ super(tableName);
+
+ createFileTableSQL = "CREATE TABLE " + tableName +
+ "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
+ "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+
+ appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
+ }
+
+ @Override
+ public int getMaxBlobSize() {
+ return MAX_BLOB_SIZE;
+ }
+
+ @Override
+ public String getCreateFileTableSQL() {
+ return createFileTableSQL;
+ }
+
+ @Override
+ public String getAppendToLargeObjectSQL() {
+ return appendToFileSQL;
+ }
+
+ @Override
+ public boolean closeConnectionOnShutdown() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
new file mode 100644
index 0000000..04af009
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+
+public class JDBCFileFactoryDriver {
+
+ protected Connection connection;
+
+ protected SQLProvider sqlProvider;
+
+ protected PreparedStatement deleteFile;
+
+ protected PreparedStatement createFile;
+
+ protected PreparedStatement selectFileByFileName;
+
+ protected PreparedStatement copyFileRecord;
+
+ protected PreparedStatement renameFile;
+
+ protected PreparedStatement readLargeObject;
+
+ protected PreparedStatement appendToLargeObject;
+
+ protected PreparedStatement selectFileNamesByExtension;
+
+ protected String connectionUrl;
+
+ protected String driverClass;
+
+ public JDBCFileFactoryDriver() {
+ }
+
+ public void setConnectionURL(String connectionUrl) {
+ this.connectionUrl = connectionUrl;
+ }
+
+ public void setSqlProvider(SQLProvider sqlProvider) {
+ this.sqlProvider = sqlProvider;
+ }
+
+ public void setDriverClass(String driverClass) {
+ this.driverClass = driverClass;
+ }
+
+ public void start() throws Exception {
+ Driver driver = JDBCUtils.getDriver(driverClass);
+ connection = driver.connect(connectionUrl, new Properties());
+ JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
+ prepareStatements();
+ }
+
+ public void stop() throws SQLException {
+ if (sqlProvider.closeConnectionOnShutdown())
+ connection.close();
+ }
+
+ protected void prepareStatements() throws SQLException {
+ this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+ this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+ this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+ this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+ this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+ this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
+ this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
+ this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+ }
+
+ public synchronized List<String> listFiles(String extension) throws Exception {
+ List<String> fileNames = new ArrayList<>();
+ try {
+ connection.setAutoCommit(false);
+ selectFileNamesByExtension.setString(1, extension);
+ try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+ while (rs.next()) {
+ fileNames.add(rs.getString(1));
+ }
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ return fileNames;
+ }
+
+ /**
+ * Opens the supplied file. If the file does not exist in the database it will create a new one.
+ *
+ * @param file
+ * @return
+ * @throws SQLException
+ */
+ public void openFile(JDBCSequentialFile file) throws SQLException {
+ int fileId = fileExists(file);
+ if (fileId < 0) {
+ createFile(file);
+ }
+ else {
+ file.setId(fileId);
+ loadFile(file);
+ }
+ }
+
+ /**
+ * Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1.
+ *
+ * @param file
+ * @return
+ * @throws SQLException
+ */
+ public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
+ connection.setAutoCommit(false);
+ selectFileByFileName.setString(1, file.getFileName());
+ try (ResultSet rs = selectFileByFileName.executeQuery()) {
+ int id = rs.next() ? rs.getInt(1) : -1;
+ connection.commit();
+ return id;
+ }
+ catch (Exception e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Loads an existing file.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.setWritePosition((int) rs.getBlob(1).length());
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Creates a new database row representing the supplied file.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ createFile.setString(1, file.getFileName());
+ createFile.setString(2, file.getExtension());
+ createFile.setBytes(3, new byte[0]);
+ createFile.executeUpdate();
+ try (ResultSet keys = createFile.getGeneratedKeys()) {
+ keys.next();
+ file.setId(keys.getInt(1));
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Updates the fileName field to the new value.
+ *
+ * @param file
+ * @param newFileName
+ * @throws SQLException
+ */
+ public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ renameFile.setString(1, newFileName);
+ renameFile.setInt(2, file.getId());
+ renameFile.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Deletes the associated row in the database.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ deleteFile.setInt(1, file.getId());
+ deleteFile.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Persists data to this files associated database mapping.
+ *
+ * @param file
+ * @param data
+ * @return
+ * @throws Exception
+ */
+ public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ appendToLargeObject.setBytes(1, data);
+ appendToLargeObject.setInt(2, file.getId());
+ appendToLargeObject.executeUpdate();
+ connection.commit();
+ return data.length;
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Reads data from the file (at file.readPosition) into the byteBuffer.
+ *
+ * @param file
+ * @param bytes
+ * @return
+ * @throws Exception
+ */
+ public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+ int readLength = 0;
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ Blob blob = rs.getBlob(1);
+ readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+ byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
+ bytes.put(data);
+ }
+ connection.commit();
+ return readLength;
+ }
+ catch (Throwable e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Copy the data content of FileFrom to FileTo
+ *
+ * @param fileFrom
+ * @param fileTo
+ * @throws SQLException
+ */
+ public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ copyFileRecord.setInt(1, fileFrom.getId());
+ copyFileRecord.setInt(2, fileTo.getId());
+ copyFileRecord.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Drop all tables and data
+ */
+ public synchronized void destroy() throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ Statement statement = connection.createStatement();
+ statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
+ long bytesRemaining = objectLength - readPosition;
+ if (bytesRemaining > bufferSpace) {
+ return bufferSpace;
+ }
+ else {
+ return bytesRemaining;
+ }
+ }
+
+ public int getMaxSize() {
+ return sqlProvider.getMaxBlobSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 73bec72..6b91223 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -19,24 +19,20 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCSequentialFile implements SequentialFile {
@@ -53,20 +49,6 @@ public class JDBCSequentialFile implements SequentialFile {
private int id = -1;
- private final PreparedStatement appendToFile;
-
- private final PreparedStatement deleteFile;
-
- private final PreparedStatement readFile;
-
- private final PreparedStatement createFile;
-
- private final PreparedStatement selectFileByFileName;
-
- private final PreparedStatement copyFileRecord;
-
- private final PreparedStatement renameFile;
-
private long readPosition = 0;
private long writePosition = 0;
@@ -75,33 +57,30 @@ public class JDBCSequentialFile implements SequentialFile {
private JDBCSequentialFileFactory fileFactory;
- private int maxSize;
+ private final Object writeLock;
- private SQLProvider sqlProvider;
+ private final JDBCFileFactoryDriver dbDriver;
- private final Object writeLock;
+ private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
+
+ // Allows DB Drivers to cache meta data.
+ private Map<Object, Object> metaData = new ConcurrentHashMap<>();
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
- final SQLProvider sqlProvider,
final Executor executor,
+ final JDBCFileFactoryDriver driver,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor;
- this.maxSize = sqlProvider.getMaxBlobSize();
- this.sqlProvider = sqlProvider;
this.writeLock = writeLock;
+ this.dbDriver = driver;
+ }
- Connection connection = fileFactory.getConnection();
- this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL());
- this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
- this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
- this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
- this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
- this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
- this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+ public void setWritePosition(int writePosition) {
+ this.writePosition = writePosition;
}
@Override
@@ -117,35 +96,11 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public synchronized void open() throws Exception {
if (!isOpen) {
- try {
- synchronized (writeLock) {
- selectFileByFileName.setString(1, filename);
-
- try (ResultSet rs = selectFileByFileName.executeQuery()) {
- if (!rs.next()) {
- createFile.setString(1, filename);
- createFile.setString(2, extension);
- createFile.setBytes(3, new byte[0]);
- createFile.executeUpdate();
- try (ResultSet keys = createFile.getGeneratedKeys()) {
- keys.next();
- this.id = keys.getInt(1);
- }
- }
- else {
- this.id = rs.getInt(1);
- this.writePosition = rs.getBlob(4).length();
- }
- }
- }
- }
- catch (SQLException e) {
- ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
- isOpen = false;
+ synchronized (writeLock) {
+ dbDriver.openFile(this);
+ isCreated = true;
+ isOpen = true;
}
-
- isCreated = true;
- isOpen = true;
}
}
@@ -156,7 +111,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public boolean fits(int size) {
- return writePosition + size <= maxSize;
+ return writePosition + size <= dbDriver.getMaxSize();
}
@Override
@@ -183,24 +138,20 @@ public class JDBCSequentialFile implements SequentialFile {
public void delete() throws IOException, InterruptedException, ActiveMQException {
try {
if (isCreated) {
- deleteFile.setInt(1, id);
- deleteFile.executeUpdate();
+ synchronized (writeLock) {
+ dbDriver.deleteFile(this);
+ }
}
}
catch (SQLException e) {
- throw new IOException(e);
+ throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
}
}
private synchronized int internalWrite(byte[] data, IOCallback callback) {
try {
synchronized (writeLock) {
- int noBytes = data.length;
- appendToFile.setBytes(1, data);
- appendToFile.setInt(2, id);
- int result = appendToFile.executeUpdate();
- if (result < 1)
- throw new ActiveMQException("No record found for file id: " + id);
+ int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes);
if (callback != null)
callback.done();
@@ -295,36 +246,19 @@ public class JDBCSequentialFile implements SequentialFile {
}
@Override
- public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException {
+ public synchronized int read(ByteBuffer bytes, final IOCallback callback) throws SQLException {
synchronized (writeLock) {
- readFile.setInt(1, id);
- try (ResultSet rs = readFile.executeQuery()) {
- if (rs.next()) {
- Blob blob = rs.getBlob(1);
-
- long bytesRemaining = blob.length() - readPosition;
- byte[] data;
- if (bytesRemaining > bytes.remaining()) {
- // First index into blob is 1 (not 0)
- data = blob.getBytes(readPosition + 1, bytes.remaining());
- }
- else {
- // First index into blob is 1 (not 0)
- data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
- }
-
- bytes.put(data);
- readPosition += data.length;
- if (callback != null)
- callback.done();
-
- return data.length;
- }
- return 0;
+ try {
+ int read = dbDriver.readFromFile(this, bytes);
+ readPosition += read;
+ if (callback != null)
+ callback.done();
+ return read;
}
catch (Exception e) {
if (callback != null)
callback.onError(-1, e.getMessage());
+ e.printStackTrace();
return 0;
}
}
@@ -352,8 +286,20 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void sync() throws IOException {
- // (mtaylor) We always write straight away, so we don't need to do anything here.
- // (mtaylor) Is this meant to be blocking?
+ final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ callback.done();
+ }
+ });
+
+ try {
+ callback.waitCompletion();
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
}
@Override
@@ -363,15 +309,15 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void renameTo(String newFileName) throws Exception {
- renameFile.setString(1, newFileName);
- renameFile.setInt(2, id);
- renameFile.executeUpdate();
+ synchronized (writeLock) {
+ dbDriver.renameFile(this, newFileName);
+ }
}
@Override
public SequentialFile cloneFile() {
try {
- JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock);
+ JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
return clone;
}
catch (Exception e) {
@@ -385,9 +331,9 @@ public class JDBCSequentialFile implements SequentialFile {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
clone.open();
- copyFileRecord.setInt(1, id);
- copyFileRecord.setInt(2, clone.getId());
- copyFileRecord.executeUpdate();
+ synchronized (writeLock) {
+ dbDriver.copyFileData(this, clone);
+ }
}
public int getId() {
@@ -416,4 +362,16 @@ public class JDBCSequentialFile implements SequentialFile {
public File getJavaFile() {
return null;
}
+
+ public void addMetaData(Object key, Object value) {
+ metaData.put(key, value);
+ }
+
+ public Object removeMetaData(Object key) {
+ return metaData.remove(key);
+ }
+
+ public Object getMetaData(Object key) {
+ return metaData.get(key);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 4231907..07e30a9 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -18,17 +18,11 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -36,53 +30,59 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
- private Connection connection;
-
- private String connectionUrl;
-
- private final Driver driver;
-
private boolean started;
- private final String tableName;
-
private List<JDBCSequentialFile> files;
- private PreparedStatement selectFileNamesByExtension;
-
private Executor executor;
- private SQLProvider sqlProvider;
-
private Map<String, Object> fileLocks = new HashMap<>();
+ private final JDBCFileFactoryDriver dbDriver;
+
public JDBCSequentialFileFactory(final String connectionUrl,
final String tableName,
final String className,
Executor executor) throws Exception {
- this.connectionUrl = connectionUrl;
this.executor = executor;
- this.tableName = tableName.toUpperCase();
-
files = new ArrayList<>();
- sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName);
- driver = JDBCUtils.getDriver(className);
+ dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl);
}
- public Connection getConnection() {
- return connection;
+ @Override
+ public synchronized void start() {
+ try {
+ if (!started) {
+ dbDriver.start();
+ started = true;
+ }
+ }
+ catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
+ started = false;
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ try {
+ dbDriver.stop();
+ }
+ catch (SQLException e) {
+ ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
+ }
+ started = false;
}
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
fileLocks.putIfAbsent(fileName, new Object());
- JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName));
+ JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName));
files.add(file);
return file;
}
@@ -99,15 +99,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public List<String> listFiles(String extension) throws Exception {
- List<String> fileNames = new ArrayList<>();
-
- selectFileNamesByExtension.setString(1, extension);
- try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
- while (rs.next()) {
- fileNames.add(rs.getString(1));
- }
- }
- return fileNames;
+ return dbDriver.listFiles(extension);
}
@Override
@@ -171,7 +163,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public void activateBuffer(SequentialFile file) {
-
}
@Override
@@ -180,34 +171,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
}
@Override
- public synchronized void start() {
- try {
- if (!started) {
- connection = driver.connect(connectionUrl, new Properties());
- JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
- selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
- started = true;
- }
- }
- catch (SQLException e) {
- ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
- started = false;
- }
- }
-
- @Override
- public synchronized void stop() {
- try {
- if (false)
- connection.close();
- }
- catch (SQLException e) {
- ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
- }
- started = false;
- }
-
- @Override
public boolean isStarted() {
return started;
}
@@ -218,12 +181,8 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public void flush() {
-
}
public synchronized void destroy() throws SQLException {
- Statement statement = connection.createStatement();
- statement.executeUpdate(sqlProvider.getDropFileTableSQL());
- stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
deleted file mode 100644
index c14036e..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public class DerbySQLProvider extends GenericSQLProvider {
-
- // Derby max blob size = 2G
- private static final int MAX_BLOB_SIZE = 2147483647;
-
- private final String createFileTableSQL;
-
- private final String appendToFileSQL;
-
- public DerbySQLProvider(String tableName) {
- super(tableName);
-
- createFileTableSQL = "CREATE TABLE " + tableName +
- "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
- "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
-
- appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
- }
-
- @Override
- public int getMaxBlobSize() {
- return MAX_BLOB_SIZE;
- }
-
- @Override
- public String getCreateFileTableSQL() {
- return createFileTableSQL;
- }
-
- @Override
- public String getAppendToFileSQL() {
- return appendToFileSQL;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
deleted file mode 100644
index c95edb3..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public class GenericSQLProvider implements SQLProvider {
-
- // Default to lowest (MYSQL = 64k)
- private static final int MAX_BLOB_SIZE = 64512;
-
- private final String tableName;
-
- private final String createFileTableSQL;
-
- private final String insertFileSQL;
-
- private final String selectFileNamesByExtensionSQL;
-
- private final String selectIdByFileNameSQL;
-
- private final String appendToFileSQL;
-
- private final String readFileSQL;
-
- private final String deleteFileSQL;
-
- private final String updateFileNameByIdSQL;
-
- private final String copyFileRecordByIdSQL;
-
- private final String cloneFileRecordSQL;
-
- private final String dropFileTableSQL;
-
- public GenericSQLProvider(String tableName) {
- this.tableName = tableName;
-
- createFileTableSQL = "CREATE TABLE " + tableName +
- "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
-
- insertFileSQL = "INSERT INTO " + tableName +
- " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
-
- selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
-
- selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
-
- appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
-
- readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
-
- deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
-
- updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
-
- cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
- "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
-
- copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
-
- dropFileTableSQL = "DROP TABLE " + tableName;
- }
-
- @Override
- public int getMaxBlobSize() {
- return MAX_BLOB_SIZE;
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public String getCreateFileTableSQL() {
- return createFileTableSQL;
- }
-
- @Override
- public String getInsertFileSQL() {
- return insertFileSQL;
- }
-
- @Override
- public String getSelectFileByFileName() {
- return selectIdByFileNameSQL;
- }
-
- @Override
- public String getSelectFileNamesByExtensionSQL() {
- return selectFileNamesByExtensionSQL;
- }
-
- @Override
- public String getAppendToFileSQL() {
- return appendToFileSQL;
- }
-
- @Override
- public String getReadFileSQL() {
- return readFileSQL;
- }
-
- @Override
- public String getDeleteFileSQL() {
- return deleteFileSQL;
- }
-
- @Override
- public String getUpdateFileNameByIdSQL() {
- return updateFileNameByIdSQL;
- }
-
- @Override
- public String getCopyFileRecordByIdSQL() {
- return copyFileRecordByIdSQL;
- }
-
- @Override
- public String getCloneFileRecordByIdSQL() {
- return cloneFileRecordSQL;
- }
-
- @Override
- public String getDropFileTableSQL() {
- return dropFileTableSQL;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
deleted file mode 100644
index e9fe36c..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public interface SQLProvider {
-
- int getMaxBlobSize();
-
- String getTableName();
-
- String getCreateFileTableSQL();
-
- String getInsertFileSQL();
-
- String getSelectFileNamesByExtensionSQL();
-
- String getSelectFileByFileName();
-
- String getAppendToFileSQL();
-
- String getReadFileSQL();
-
- String getDeleteFileSQL();
-
- String getUpdateFileNameByIdSQL();
-
- String getCopyFileRecordByIdSQL();
-
- String getDropFileTableSQL();
-
- String getCloneFileRecordByIdSQL();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index f253167..1e6f393 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -17,21 +17,15 @@
package org.apache.activemq.artemis.jdbc.store.journal;
-import java.sql.Connection;
-import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -44,23 +38,18 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
-import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
-public class JDBCJournalImpl implements Journal {
+public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sync Delay in ms
public static final int SYNC_DELAY = 5;
private static int USER_VERSION = 1;
- private final String tableName;
-
- private final String jdbcDriverClass;
-
- private Connection connection;
-
- private List<JDBCJournalRecord> records;
+ private final List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords;
@@ -74,13 +63,9 @@ public class JDBCJournalImpl implements Journal {
private boolean started;
- private String jdbcUrl;
-
private Timer syncTimer;
- private Driver dbDriver;
-
- private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+ private final Object journalLock = new Object();
private final String timerThread;
@@ -90,68 +75,48 @@ public class JDBCJournalImpl implements Journal {
// Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0);
+ private Logger logger = Logger.getLogger(this.getClass());
+
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) {
- this.tableName = tableName;
- this.jdbcUrl = jdbcUrl;
- this.jdbcDriverClass = jdbcDriverClass;
+ super(tableName, jdbcUrl, jdbcDriverClass);
timerThread = "Timer JDBC Journal(" + tableName + ")";
-
records = new ArrayList<>();
}
@Override
public void start() throws Exception {
- dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
-
- try {
- connection = dbDriver.connect(jdbcUrl, new Properties());
- }
- catch (SQLException e) {
- ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcUrl);
- throw new RuntimeException("Error connecting to database", e);
- }
-
- JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName));
-
- insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
- selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
- countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
- deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
- deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName));
-
+ super.start();
syncTimer = new Timer(timerThread, true);
syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
-
started = true;
}
- @Override
- public void stop() throws Exception {
- stop(true);
+ protected void createSchema() throws SQLException {
+ createTable(sqlProvider.getCreateJournalTableSQL());
}
- public synchronized void stop(boolean shutdownConnection) throws Exception {
- if (started) {
- journalLock.writeLock().lock();
-
- syncTimer.cancel();
+ protected void prepareStatements() throws SQLException {
+ insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
+ selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
+ countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL());
+ deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL());
+ deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL());
+ }
- sync();
- if (shutdownConnection) {
- connection.close();
+ @Override
+ public synchronized void stop() throws Exception {
+ if (started) {
+ synchronized (journalLock) {
+ syncTimer.cancel();
+ sync();
+ started = false;
+ super.stop();
}
-
- started = false;
- journalLock.writeLock().unlock();
}
}
public synchronized void destroy() throws Exception {
- connection.setAutoCommit(false);
- Statement statement = connection.createStatement();
- statement.executeUpdate("DROP TABLE " + tableName);
- statement.close();
- connection.commit();
+ super.destroy();
stop();
}
@@ -159,8 +124,11 @@ public class JDBCJournalImpl implements Journal {
if (!started)
return 0;
- List<JDBCJournalRecord> recordRef = records;
- records = new ArrayList<JDBCJournalRecord>();
+ List<JDBCJournalRecord> recordRef = new ArrayList<>();
+ synchronized (records) {
+ recordRef.addAll(records);
+ records.clear();
+ }
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
List<Long> deletedRecords = new ArrayList<>();
@@ -215,12 +183,18 @@ public class JDBCJournalImpl implements Journal {
deleteJournalTxRecords.executeBatch();
connection.commit();
-
- cleanupTxRecords(deletedRecords, committedTransactions);
success = true;
}
catch (SQLException e) {
- performRollback(connection, recordRef);
+ performRollback(recordRef);
+ }
+
+ try {
+ if (success)
+ cleanupTxRecords(deletedRecords, committedTransactions);
+ }
+ catch (SQLException e) {
+ e.printStackTrace();
}
executeCallbacks(recordRef, success);
@@ -230,12 +204,11 @@ public class JDBCJournalImpl implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
-
+ connection.rollback();
List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values());
-
for (Long txId : committedTx) {
transactions.get(txId).committed = true;
}
@@ -260,9 +233,8 @@ public class JDBCJournalImpl implements Journal {
}
}
- private void performRollback(Connection connection, List<JDBCJournalRecord> records) {
+ private void performRollback(List<JDBCJournalRecord> records) {
try {
- connection.rollback();
for (JDBCJournalRecord record : records) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
removeTxRecord(record);
@@ -306,18 +278,18 @@ public class JDBCJournalImpl implements Journal {
record.setIoCompletion(callback);
}
- try {
- journalLock.writeLock().lock();
+ synchronized (journalLock) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
addTxRecord(record);
}
- records.add(record);
- }
- finally {
- journalLock.writeLock().unlock();
+
+ synchronized (records) {
+ records.add(record);
+ }
}
- if (callback != null) callback.waitCompletion();
+ if (callback != null)
+ callback.waitCompletion();
}
private synchronized void addTxRecord(JDBCJournalRecord record) {
@@ -703,12 +675,12 @@ public class JDBCJournalImpl implements Journal {
@Override
public final void synchronizationLock() {
- journalLock.writeLock().lock();
+ logger.error("Replication is not supported with JDBC Store");
}
@Override
public final void synchronizationUnlock() {
- journalLock.writeLock().unlock();
+ logger.error("Replication is not supported with JDBC Store");
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
new file mode 100644
index 0000000..6efa170
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.sql;
+
+public class GenericSQLProvider implements SQLProvider {
+
+ // Default to lowest (MYSQL = 64k)
+ private static final int MAX_BLOB_SIZE = 64512;
+
+ protected final String tableName;
+
+ private final String createFileTableSQL;
+
+ private final String insertFileSQL;
+
+ private final String selectFileNamesByExtensionSQL;
+
+ private final String selectIdByFileNameSQL;
+
+ private final String appendToFileSQL;
+
+ private final String readLargeObjectSQL;
+
+ private final String deleteFileSQL;
+
+ private final String updateFileNameByIdSQL;
+
+ private final String copyFileRecordByIdSQL;
+
+ private final String cloneFileRecordSQL;
+
+ private final String dropFileTableSQL;
+
+ private final String createJournalTableSQL;
+
+ private final String insertJournalRecordsSQL;
+
+ private final String selectJournalRecordsSQL;
+
+ private final String deleteJournalRecordsSQL;
+
+ private final String deleteJournalTxRecordsSQL;
+
+ private final String countJournalRecordsSQL;
+
+ public GenericSQLProvider(String tableName) {
+ this.tableName = tableName;
+
+ createFileTableSQL = "CREATE TABLE " + tableName +
+ "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+
+ insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
+
+ selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
+
+ selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
+
+ appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
+
+ readLargeObjectSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
+
+ deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
+
+ updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
+
+ cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
+ "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
+
+ copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
+
+ dropFileTableSQL = "DROP TABLE " + tableName;
+
+ createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
+
+ insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
+
+ selectJournalRecordsSQL = "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
+
+ deleteJournalRecordsSQL = "DELETE FROM " + tableName + " WHERE id = ?";
+
+ deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
+
+ countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
+ }
+
+ @Override
+ public int getMaxBlobSize() {
+ return MAX_BLOB_SIZE;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ // Journal SQL Statements
+ @Override
+ public String getCreateJournalTableSQL() {
+ return createJournalTableSQL;
+ }
+
+ @Override
+ public String getInsertJournalRecordsSQL() {
+ return insertJournalRecordsSQL;
+ }
+
+ @Override
+ public String getSelectJournalRecordsSQL() {
+ return selectJournalRecordsSQL;
+ }
+
+ @Override
+ public String getDeleteJournalRecordsSQL() {
+ return deleteJournalRecordsSQL;
+ }
+
+ @Override
+ public String getDeleteJournalTxRecordsSQL() {
+ return deleteJournalTxRecordsSQL;
+ }
+
+ @Override
+ public String getCountJournalRecordsSQL() {
+ return countJournalRecordsSQL;
+ }
+
+ // Large Message Statements
+ @Override
+ public String getCreateFileTableSQL() {
+ return createFileTableSQL;
+ }
+
+ @Override
+ public String getInsertFileSQL() {
+ return insertFileSQL;
+ }
+
+ @Override
+ public String getSelectFileByFileName() {
+ return selectIdByFileNameSQL;
+ }
+
+ @Override
+ public String getSelectFileNamesByExtensionSQL() {
+ return selectFileNamesByExtensionSQL;
+ }
+
+ @Override
+ public String getAppendToLargeObjectSQL() {
+ return appendToFileSQL;
+ }
+
+ @Override
+ public String getReadLargeObjectSQL() {
+ return readLargeObjectSQL;
+ }
+
+ @Override
+ public String getDeleteFileSQL() {
+ return deleteFileSQL;
+ }
+
+ @Override
+ public String getUpdateFileNameByIdSQL() {
+ return updateFileNameByIdSQL;
+ }
+
+ @Override
+ public String getCopyFileRecordByIdSQL() {
+ return copyFileRecordByIdSQL;
+ }
+
+ @Override
+ public String getCloneFileRecordByIdSQL() {
+ return cloneFileRecordSQL;
+ }
+
+ @Override
+ public String getDropFileTableSQL() {
+ return dropFileTableSQL;
+ }
+
+ @Override
+ public boolean closeConnectionOnShutdown() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
new file mode 100644
index 0000000..5645ebc
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.sql;
+
+public interface SQLProvider {
+
+ int getMaxBlobSize();
+
+ String getCreateJournalTableSQL();
+
+ String getInsertJournalRecordsSQL();
+
+ String getSelectJournalRecordsSQL();
+
+ String getDeleteJournalRecordsSQL();
+
+ String getDeleteJournalTxRecordsSQL();
+
+ String getTableName();
+
+ String getCreateFileTableSQL();
+
+ String getInsertFileSQL();
+
+ String getSelectFileNamesByExtensionSQL();
+
+ String getSelectFileByFileName();
+
+ String getAppendToLargeObjectSQL();
+
+ String getReadLargeObjectSQL();
+
+ String getDeleteFileSQL();
+
+ String getUpdateFileNameByIdSQL();
+
+ String getCopyFileRecordByIdSQL();
+
+ String getDropFileTableSQL();
+
+ String getCloneFileRecordByIdSQL();
+
+ String getCountJournalRecordsSQL();
+
+ boolean closeConnectionOnShutdown();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index bd200bd..1a8fba4 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -63,7 +63,7 @@ public class JDBCSequentialFileFactoryTest {
}
@After
- public void tearDown() throws SQLException {
+ public void tearDown() throws Exception {
factory.destroy();
}
@@ -126,8 +126,8 @@ public class JDBCSequentialFileFactoryTest {
JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
file.copyTo(copy);
- checkData(copy, src);
checkData(file, src);
+ checkData(copy, src);
}
@Test
@@ -145,7 +145,12 @@ public class JDBCSequentialFileFactoryTest {
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
+ assertEquals(bufferSize, file.size());
JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
+ copy.open();
+
+ assertEquals(bufferSize, copy.size());
+ assertEquals(bufferSize, file.size());
}
private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 25b7cd1..a5d55ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -88,10 +88,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
beforeStop();
- ((JDBCJournalImpl) bindingsJournal).stop(false);
-
- ((JDBCJournalImpl) messageJournal).stop(false);
-
+ bindingsJournal.stop();
+ messageJournal.stop();
largeMessagesFactory.stop();
singleThreadExecutor.shutdown();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 86d8cf6..31ad1d6 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -40,6 +40,10 @@ import java.lang.management.ManagementFactory;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.ServerSocket;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -124,6 +128,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@@ -448,12 +454,34 @@ public abstract class ActiveMQTestBase extends Assert {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
- dbStorageConfiguration.setMessageTableName("MESSAGES");
- dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
+ dbStorageConfiguration.setMessageTableName("MESSAGE");
+ dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
+ dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
configuration.setStoreConfiguration(dbStorageConfiguration);
}
+ public void destroyTables(List<String> tableNames) throws Exception {
+ Driver driver = JDBCUtils.getDriver(getJDBCClassName());
+ Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
+ Statement statement = connection.createStatement();
+ try {
+ for (String tableName : tableNames) {
+ SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName);
+ ResultSet rs = connection.getMetaData().getTables(null, null, sqlProvider.getTableName(), null);
+ if (rs.next()) {
+ statement.execute("DROP TABLE " + sqlProvider.getTableName());
+ }
+ }
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ finally {
+ connection.close();
+ }
+ }
+
protected Map<String, Object> generateInVMParams(final int node) {
Map<String, Object> params = new HashMap<>();
@@ -797,7 +825,11 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected final String getTestJDBCConnectionUrl() {
- return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true";
+ return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true");
+ }
+
+ protected final String getJDBCClassName() {
+ return System.getProperty("jdbc.driver.class","org.apache.derby.jdbc.EmbeddedDriver");
}
protected final File getTestDirfile() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
index 6664d1e..03ed5b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
@@ -79,6 +79,13 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
this.storeType = storeType;
}
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
+ }
+ }
+
@Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
index 391cccf..4f4c5de 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
@@ -40,7 +40,6 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
-
mapExpectedSets = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 75e5eb2..886cde3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
-import java.sql.DriverManager;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -105,15 +103,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null;
}
- // Stops the database engine early to stop thread leaks showing.
- if (storeType == StoreConfiguration.StoreType.DATABASE) {
- try {
- DriverManager.getConnection("jdbc:derby:;shutdown=true");
- }
- catch (SQLException e) {
- }
- }
-
+ destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
super.tearDown();
if (exception != null)
throw exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
index 5394565..06a3f77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
@@ -125,6 +125,9 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
public void tearDown() throws Exception {
MBeanServerFactory.releaseMBeanServer(mbeanServer);
super.tearDown();
+ if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
+ }
}
@Test