You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/01/17 20:01:06 UTC
[1/3] activemq-artemis git commit: ARTEMIS-560 Small improvements on
JDBC tests for paging + adding a logger
Repository: activemq-artemis
Updated Branches:
refs/heads/master a79094b6e -> 43aaf52d4
ARTEMIS-560 Small improvements on JDBC tests for paging + adding a logger
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b4cbd36a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b4cbd36a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b4cbd36a
Branch: refs/heads/master
Commit: b4cbd36af4d7b7120f38f4ea63e4e32489350c96
Parents: 118c272
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jan 17 14:57:39 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 17 14:58:53 2017 -0500
----------------------------------------------------------------------
.../jdbc/store/file/JDBCSequentialFile.java | 1 +
.../tests/integration/paging/PagingTest.java | 47 ++++++++++----------
2 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b4cbd36a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 3f078c2..69ff11a 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -92,6 +92,7 @@ public class JDBCSequentialFile implements SequentialFile {
try {
return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) {
+ logger.warn(e.getMessage(), e);
return false;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b4cbd36a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index bc614ea..3a932b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -56,10 +56,9 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
@@ -1461,8 +1460,6 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testMissingTXEverythingAcked() throws Exception {
- if (storeType == StoreConfiguration.StoreType.DATABASE) return;
-
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -1516,31 +1513,35 @@ public class PagingTest extends ActiveMQTestBase {
}
session.commit();
session.close();
- } finally {
- try {
- server.stop();
- } catch (Throwable ignored) {
- }
- }
- ArrayList<RecordInfo> records = new ArrayList<>();
- List<PreparedTransactionInfo> list = new ArrayList<>();
+ ArrayList<RecordInfo> records = new ArrayList<>();
+
+ List<PreparedTransactionInfo> list = new ArrayList<>();
+
+ server.getStorageManager().getMessageJournal().stop();
- JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
- jrn.start();
- jrn.load(records, list, null);
+ Journal jrn = server.getStorageManager().getMessageJournal();
+ jrn.start();
+ jrn.load(records, list, null);
- // Delete everything from the journal
- for (RecordInfo info : records) {
- if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE &&
- info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC &&
- info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
- jrn.appendDeleteRecord(info.id, false);
+ // Delete everything from the journal
+ for (RecordInfo info : records) {
+ if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE &&
+ info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC &&
+ info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
+ jrn.appendDeleteRecord(info.id, false);
+ }
}
- }
- jrn.stop();
+ jrn.stop();
+
+ } finally {
+ try {
+ server.stop();
+ } catch (Throwable ignored) {
+ }
+ }
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
[3/3] activemq-artemis git commit: This closes #962
Posted by cl...@apache.org.
This closes #962
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/43aaf52d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/43aaf52d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/43aaf52d
Branch: refs/heads/master
Commit: 43aaf52d423e30a96251dbfbe96e0782aeefbf6a
Parents: a79094b b4cbd36
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jan 17 14:59:14 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 17 14:59:14 2017 -0500
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 7 +
.../jdbc/store/drivers/AbstractJDBCDriver.java | 79 +++---
.../store/drivers/derby/DerbySQLProvider.java | 2 +-
.../artemis/jdbc/store/file/JDBCFileUtils.java | 12 +
.../jdbc/store/file/JDBCSequentialFile.java | 8 +-
.../store/file/JDBCSequentialFileFactory.java | 12 +
.../file/JDBCSequentialFileFactoryDriver.java | 253 +++++++++++--------
.../PostgresSequentialSequentialFileDriver.java | 171 +++++++------
.../storage/DatabaseStorageConfiguration.java | 10 +
.../deployers/impl/FileConfigurationParser.java | 1 +
.../paging/impl/PagingStoreFactoryDatabase.java | 213 ++++++++++++++++
.../core/paging/impl/PagingStoreFactoryNIO.java | 3 +-
.../impl/journal/JDBCJournalStorageManager.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 11 +-
.../resources/schema/artemis-configuration.xsd | 7 +
.../artemis/tests/util/ActiveMQTestBase.java | 3 +-
.../test/resources/database-store-config.xml | 1 +
.../integration/paging/GlobalPagingTest.java | 10 +
.../tests/integration/paging/PagingTest.java | 70 +++--
19 files changed, 624 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-560 Add Support For JDBC
Paging
Posted by cl...@apache.org.
ARTEMIS-560 Add Support For JDBC Paging
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/118c272c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/118c272c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/118c272c
Branch: refs/heads/master
Commit: 118c272c771ac4f2df168d6ef0278c8ade7b700d
Parents: a79094b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Jan 12 11:16:48 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 17 14:58:53 2017 -0500
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 7 +
.../jdbc/store/drivers/AbstractJDBCDriver.java | 79 +++---
.../store/drivers/derby/DerbySQLProvider.java | 2 +-
.../artemis/jdbc/store/file/JDBCFileUtils.java | 12 +
.../jdbc/store/file/JDBCSequentialFile.java | 7 +-
.../store/file/JDBCSequentialFileFactory.java | 12 +
.../file/JDBCSequentialFileFactoryDriver.java | 253 +++++++++++--------
.../PostgresSequentialSequentialFileDriver.java | 171 +++++++------
.../storage/DatabaseStorageConfiguration.java | 10 +
.../deployers/impl/FileConfigurationParser.java | 1 +
.../paging/impl/PagingStoreFactoryDatabase.java | 213 ++++++++++++++++
.../core/paging/impl/PagingStoreFactoryNIO.java | 3 +-
.../impl/journal/JDBCJournalStorageManager.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 11 +-
.../resources/schema/artemis-configuration.xsd | 7 +
.../artemis/tests/util/ActiveMQTestBase.java | 3 +-
.../test/resources/database-store-config.xml | 1 +
.../integration/paging/GlobalPagingTest.java | 10 +
.../tests/integration/paging/PagingTest.java | 27 +-
19 files changed, 601 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 08068b5..8fce7ea 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -432,6 +432,9 @@ public final class ActiveMQDefaultConfiguration {
// Default large messages table name, used with Database storage type
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
+ // Default large messages table name, used with Database storage type
+ private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
+
// Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
@@ -1190,6 +1193,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
}
+ public static String getDefaultPageStoreTableName() {
+ return DEFAULT_PAGE_STORE_TABLE_NAME;
+ }
+
public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index d75ea21..1828911 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -36,6 +36,7 @@ import org.jboss.logging.Logger;
/**
* Class to hold common database functionality such as drivers and connections
*/
+@SuppressWarnings("SynchronizeOnNonFinalField")
public abstract class AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
@@ -66,17 +67,26 @@ public abstract class AbstractJDBCDriver {
public void start() throws SQLException {
connect();
- createSchema();
- prepareStatements();
+ synchronized (connection) {
+ createSchema();
+ prepareStatements();
+ }
+ }
+
+ public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
+ this.connection = connection;
+ this.sqlProvider = sqlProvider;
}
public void stop() throws SQLException {
- if (sqlProvider.closeConnectionOnShutdown()) {
- try {
- connection.close();
- } catch (SQLException e) {
- logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
- throw e;
+ synchronized (connection) {
+ if (sqlProvider.closeConnectionOnShutdown()) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+ throw e;
+ }
}
}
}
@@ -90,30 +100,32 @@ public abstract class AbstractJDBCDriver {
}
private void connect() throws SQLException {
- if (dataSource != null) {
- try {
- connection = dataSource.getConnection();
- } catch (SQLException e) {
- logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
- throw e;
- }
- } else {
- try {
- if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
- throw new IllegalStateException("jdbcDriverClass is null or empty!");
- }
- if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
- throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
+ if (connection == null) {
+ if (dataSource != null) {
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+ throw e;
}
- final Driver dbDriver = getDriver(jdbcDriverClass);
- connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
- if (connection == null) {
- throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
+ } else {
+ try {
+ if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
+ throw new IllegalStateException("jdbcDriverClass is null or empty!");
+ }
+ if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
+ throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
+ }
+ final Driver dbDriver = getDriver(jdbcDriverClass);
+ connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
+ if (connection == null) {
+ throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
+ }
+ } catch (SQLException e) {
+ logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+ ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
+ throw e;
}
- } catch (SQLException e) {
- logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
- ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
- throw e;
}
}
}
@@ -206,8 +218,10 @@ public abstract class AbstractJDBCDriver {
return connection;
}
- public void setConnection(Connection connection) {
- this.connection = connection;
+ public final void setConnection(Connection connection) {
+ if (connection == null) {
+ this.connection = connection;
+ }
}
public void setSqlProvider(SQLProvider sqlProvider) {
@@ -225,4 +239,5 @@ public abstract class AbstractJDBCDriver {
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
index 121c6f7..281ea88 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
@@ -29,7 +29,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
private final String appendToFileSQL;
private DerbySQLProvider(String tableName) {
- super(tableName);
+ super(tableName.toUpperCase());
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
index 02b1128..58494b0 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
+import java.sql.Connection;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
@@ -45,4 +46,15 @@ class JDBCFileUtils {
}
return dbDriver;
}
+
+ static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
+ JDBCSequentialFileFactoryDriver dbDriver;
+ if (provider instanceof PostgresSQLProvider) {
+ dbDriver = new PostgresSequentialSequentialFileDriver();
+ dbDriver.setConnection(connection);
+ } else {
+ dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
+ }
+ return dbDriver;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 34b6a4f..3f078c2 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -88,7 +88,12 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public boolean exists() {
- return isCreated;
+ if (isCreated) return true;
+ try {
+ return fileFactory.listFiles(extension).contains(filename);
+ } catch (Exception e) {
+ return false;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 008e000..4b92c71 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.io.File;
import java.nio.ByteBuffer;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -60,6 +61,17 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
}
+ public JDBCSequentialFileFactory(final Connection connection,
+ final SQLProvider sqlProvider,
+ final Executor executor) throws Exception {
+ this.executor = executor;
+ this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
+ }
+
+ public JDBCSequentialFileFactoryDriver getDbDriver() {
+ return dbDriver;
+ }
+
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index 7b9eaf1..f9f206a 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.nio.ByteBuffer;
import java.sql.Blob;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -29,6 +30,7 @@ import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+@SuppressWarnings("SynchronizeOnNonFinalField")
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
protected PreparedStatement deleteFile;
@@ -55,6 +57,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
super(dataSource, provider);
}
+ JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
+ super(connection, sqlProvider);
+ }
+
@Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateFileTableSQL());
@@ -72,22 +78,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
}
- public synchronized List<String> listFiles(String extension) throws Exception {
- List<String> fileNames = new ArrayList<>();
- try {
- connection.setAutoCommit(false);
- selectFileNamesByExtension.setString(1, extension);
- try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
- while (rs.next()) {
- fileNames.add(rs.getString(1));
+ public List<String> listFiles(String extension) throws Exception {
+ synchronized (connection) {
+ List<String> fileNames = new ArrayList<>();
+ try {
+ connection.setAutoCommit(false);
+ selectFileNamesByExtension.setString(1, extension);
+ try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+ while (rs.next()) {
+ fileNames.add(rs.getString(1));
+ }
}
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ return fileNames;
}
- return fileNames;
}
/**
@@ -113,16 +121,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return
* @throws SQLException
*/
- public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
- connection.setAutoCommit(false);
- selectFileByFileName.setString(1, file.getFileName());
- try (ResultSet rs = selectFileByFileName.executeQuery()) {
- int id = rs.next() ? rs.getInt(1) : -1;
- connection.commit();
- return id;
- } catch (Exception e) {
- connection.rollback();
- throw e;
+ public int fileExists(JDBCSequentialFile file) throws SQLException {
+ try {
+ synchronized (connection) {
+ connection.setAutoCommit(false);
+ selectFileByFileName.setString(1, file.getFileName());
+ try (ResultSet rs = selectFileByFileName.executeQuery()) {
+ int id = rs.next() ? rs.getInt(1) : -1;
+ connection.commit();
+ return id;
+ } catch (Exception e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+ } catch (NullPointerException npe) {
+ npe.printStackTrace();
+ throw npe;
}
}
@@ -132,18 +147,20 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file
* @throws SQLException
*/
- public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
+ public void loadFile(JDBCSequentialFile file) throws SQLException {
+ synchronized (connection) {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- file.setWritePosition((int) rs.getBlob(1).length());
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.setWritePosition((int) rs.getBlob(1).length());
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
@@ -153,21 +170,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file
* @throws SQLException
*/
- public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
- try {
- connection.setAutoCommit(false);
- createFile.setString(1, file.getFileName());
- createFile.setString(2, file.getExtension());
- createFile.setBytes(3, new byte[0]);
- createFile.executeUpdate();
- try (ResultSet keys = createFile.getGeneratedKeys()) {
- keys.next();
- file.setId(keys.getInt(1));
+ public void createFile(JDBCSequentialFile file) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ createFile.setString(1, file.getFileName());
+ createFile.setString(2, file.getExtension());
+ createFile.setBytes(3, new byte[0]);
+ createFile.executeUpdate();
+ try (ResultSet keys = createFile.getGeneratedKeys()) {
+ keys.next();
+ file.setId(keys.getInt(1));
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
@@ -178,16 +197,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param newFileName
* @throws SQLException
*/
- public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
- try {
- connection.setAutoCommit(false);
- renameFile.setString(1, newFileName);
- renameFile.setInt(2, file.getId());
- renameFile.executeUpdate();
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ renameFile.setString(1, newFileName);
+ renameFile.setInt(2, file.getId());
+ renameFile.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
@@ -197,15 +218,17 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file
* @throws SQLException
*/
- public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
- try {
- connection.setAutoCommit(false);
- deleteFile.setInt(1, file.getId());
- deleteFile.executeUpdate();
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ public void deleteFile(JDBCSequentialFile file) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ deleteFile.setInt(1, file.getId());
+ deleteFile.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
@@ -217,17 +240,19 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return
* @throws SQLException
*/
- public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
- try {
- connection.setAutoCommit(false);
- appendToLargeObject.setBytes(1, data);
- appendToLargeObject.setInt(2, file.getId());
- appendToLargeObject.executeUpdate();
- connection.commit();
- return data.length;
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ appendToLargeObject.setBytes(1, data);
+ appendToLargeObject.setInt(2, file.getId());
+ appendToLargeObject.executeUpdate();
+ connection.commit();
+ return data.length;
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
@@ -239,22 +264,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return
* @throws SQLException
*/
- public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
- int readLength = 0;
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- Blob blob = rs.getBlob(1);
- readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
- byte[] data = blob.getBytes(file.position() + 1, readLength);
- bytes.put(data);
+ public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+ synchronized (connection) {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+ int readLength = 0;
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ Blob blob = rs.getBlob(1);
+ readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+ byte[] data = blob.getBytes(file.position() + 1, readLength);
+ bytes.put(data);
+ }
+ connection.commit();
+ return readLength;
+ } catch (Throwable e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- return readLength;
- } catch (Throwable e) {
- connection.rollback();
- throw e;
}
}
@@ -265,16 +292,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param fileTo
* @throws SQLException
*/
- public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
- try {
- connection.setAutoCommit(false);
- copyFileRecord.setInt(1, fileFrom.getId());
- copyFileRecord.setInt(2, fileTo.getId());
- copyFileRecord.executeUpdate();
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ copyFileRecord.setInt(1, fileFrom.getId());
+ copyFileRecord.setInt(2, fileTo.getId());
+ copyFileRecord.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
@@ -282,16 +311,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* Drop all tables and data
*/
@Override
- public synchronized void destroy() throws SQLException {
- try {
- connection.setAutoCommit(false);
- try (Statement statement = connection.createStatement()) {
- statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+ public void destroy() throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
index c7411a6..8c0f975 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
@@ -24,6 +24,7 @@ import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
+@SuppressWarnings("SynchronizeOnNonFinalField")
public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
@@ -33,105 +34,115 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
}
@Override
- public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
- try {
- connection.setAutoCommit(false);
+ public void createFile(JDBCSequentialFile file) throws SQLException {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
- LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
- long oid = lobjManager.createLO();
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+ long oid = lobjManager.createLO();
- createFile.setString(1, file.getFileName());
- createFile.setString(2, file.getExtension());
- createFile.setLong(3, oid);
- createFile.executeUpdate();
+ createFile.setString(1, file.getFileName());
+ createFile.setString(2, file.getExtension());
+ createFile.setLong(3, oid);
+ createFile.executeUpdate();
- try (ResultSet keys = createFile.getGeneratedKeys()) {
- keys.next();
- file.setId(keys.getInt(1));
+ try (ResultSet keys = createFile.getGeneratedKeys()) {
+ keys.next();
+ file.setId(keys.getInt(1));
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
@Override
- public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
+ public void loadFile(JDBCSequentialFile file) throws SQLException {
+ synchronized (connection) {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- file.setWritePosition(getPostGresLargeObjectSize(file));
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.setWritePosition(getPostGresLargeObjectSize(file));
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
@Override
- public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
- LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
- LargeObject largeObject = null;
+ public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+ synchronized (connection) {
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+ LargeObject largeObject = null;
- Long oid = getOID(file);
- try {
- connection.setAutoCommit(false);
- largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
- largeObject.seek(largeObject.size());
- largeObject.write(data);
- largeObject.close();
- connection.commit();
- } catch (Exception e) {
- connection.rollback();
- throw e;
+ Long oid = getOID(file);
+ try {
+ connection.setAutoCommit(false);
+ largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
+ largeObject.seek(largeObject.size());
+ largeObject.write(data);
+ largeObject.close();
+ connection.commit();
+ } catch (Exception e) {
+ connection.rollback();
+ throw e;
+ }
+ return data.length;
}
- return data.length;
}
@Override
- public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+ public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
long oid = getOID(file);
- try {
- connection.setAutoCommit(false);
- largeObject = lobjManager.open(oid, LargeObjectManager.READ);
- int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
-
- if (readLength > 0) {
- if (file.position() > 0)
- largeObject.seek((int) file.position());
- byte[] data = largeObject.read(readLength);
- bytes.put(data);
- }
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+ int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
+
+ if (readLength > 0) {
+ if (file.position() > 0)
+ largeObject.seek((int) file.position());
+ byte[] data = largeObject.read(readLength);
+ bytes.put(data);
+ }
- largeObject.close();
- connection.commit();
+ largeObject.close();
+ connection.commit();
- return readLength;
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ return readLength;
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
- private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
+ private Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+ synchronized (connection) {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+ }
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
}
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
}
}
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
@@ -140,21 +151,23 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
return (Long) file.getMetaData(POSTGRES_OID_KEY);
}
- private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
+ private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
int size = 0;
Long oid = getOID(file);
if (oid != null) {
- try {
- connection.setAutoCommit(false);
- LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
- size = largeObject.size();
- largeObject.close();
- connection.commit();
- } catch (SQLException e) {
- connection.rollback();
- throw e;
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+ size = largeObject.size();
+ largeObject.close();
+ connection.commit();
+ } catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
}
}
return size;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 8b20770..eb8b435 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -30,6 +30,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
+ private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
+
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@@ -67,6 +69,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.largeMessagesTableName = largeMessagesTableName;
}
+ public String getPageStoreTableName() {
+ return pageStoreTableName;
+ }
+
+ public void setPageStoreTableName(String pageStoreTableName) {
+ this.pageStoreTableName = pageStoreTableName;
+ }
+
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index dff6f1b..2721214 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1286,6 +1286,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
+ conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
return conf;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
new file mode 100644
index 0000000..ee9d7bb
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+
+/**
+ * Integration point between Paging and JDBC
+ */
+public class PagingStoreFactoryDatabase implements PagingStoreFactory {
+
+ // Constants -----------------------------------------------------
+
+ private static final String ADDRESS_FILE = "address.txt";
+
+ private static final String DIRECTORY_NAME = "directory.txt";
+
+ // Attributes ----------------------------------------------------
+
+ protected final boolean syncNonTransactional;
+
+ private PagingManager pagingManager;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final long syncTimeout;
+
+ protected final StorageManager storageManager;
+
+ private JDBCSequentialFileFactoryDriver dbDriver;
+
+ private DatabaseStorageConfiguration dbConf;
+
+ private ExecutorFactory executorFactory;
+
+ private JDBCSequentialFileFactory pagingFactoryFileFactory;
+
+ private JDBCSequentialFile directoryList;
+
+ public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
+ final StorageManager storageManager,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener critialErrorListener) throws Exception {
+ this.storageManager = storageManager;
+ this.executorFactory = executorFactory;
+ this.syncNonTransactional = syncNonTransactional;
+ this.scheduledExecutor = scheduledExecutor;
+ this.syncTimeout = syncTimeout;
+ this.dbConf = dbConf;
+
+ if (dbConf.getDataSource() != null) {
+ SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
+ if (sqlProviderFactory == null) {
+ sqlProviderFactory = new GenericSQLProvider.Factory();
+ }
+ pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
+ } else {
+ String driverClassName = dbConf.getJdbcDriverClassName();
+ pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
+ }
+ pagingFactoryFileFactory.start();
+ directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
+ directoryList.open();
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public void stop() {
+ pagingFactoryFileFactory.stop();
+ }
+
+ @Override
+ public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+ }
+
+ @Override
+ public PageCursorProvider newCursorProvider(PagingStore store,
+ StorageManager storageManager,
+ AddressSettings addressSettings,
+ Executor executor) {
+ return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ }
+
+ @Override
+ public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
+
+ return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+ }
+
+ @Override
+ public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
+ String guid = UUIDGenerator.getInstance().generateStringUUID();
+ SequentialFileFactory factory = newFileFactory(guid, true);
+ factory.start();
+
+ SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
+ file.open();
+
+ ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
+ buffer.writeSimpleString(address);
+ file.write(buffer, true);
+ return factory;
+ }
+
+ @Override
+ public void setPagingManager(final PagingManager pagingManager) {
+ this.pagingManager = pagingManager;
+ }
+
+ @Override
+ public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
+ // We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
+ int size = ((Long) directoryList.size()).intValue();
+ ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
+
+ ArrayList<PagingStore> storesReturn = new ArrayList<>();
+
+ while (buffer.readableBytes() > 0) {
+ SimpleString guid = buffer.readSimpleString();
+
+ JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
+ factory.start();
+
+ JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
+ addressFile.open();
+
+ size = ((Long) addressFile.size()).intValue();
+ if (size == 0) {
+ continue;
+ }
+
+ ActiveMQBuffer addrBuffer = readActiveMQBuffer(addressFile, size);
+ SimpleString address = addrBuffer.readSimpleString();
+
+ AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
+
+ PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+
+ storesReturn.add(store);
+ }
+ return storesReturn;
+ }
+
+ private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
+ SimpleString simpleString = SimpleString.toSimpleString(directoryName);
+ ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
+ buffer.writeSimpleString(simpleString);
+ if (writeToDirectory) directoryList.write(buffer, true);
+ return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
+ }
+
+ private String getTableNameForGUID(String guid) {
+ return dbConf.getPageStoreTableName() + guid.replace("-", "");
+ }
+
+ private ActiveMQBuffer readActiveMQBuffer(SequentialFile file, int size) throws Exception {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+ byteBuffer.mark();
+ file.read(byteBuffer);
+ byteBuffer.reset();
+
+ ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
+ buffer.writerIndex(size);
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index e0f3a22..823baf8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -70,7 +70,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
private final long syncTimeout;
- private final StorageManager storageManager;
+ protected final StorageManager storageManager;
private final IOCriticalErrorListener critialErrorListener;
@@ -187,6 +187,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
}
private SequentialFileFactory newFileFactory(final String directoryName) {
+
return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 4e5c447..416da0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
+import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -34,6 +35,8 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager {
+ private Connection connection;
+
public JDBCJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ExecutorFactory ioExecutorFactory,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 373e210..5c392cc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -70,8 +71,10 @@ import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
@@ -1899,11 +1902,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.queueFactory = factory;
}
- protected PagingManager createPagingManager() {
+ protected PagingManager createPagingManager() throws Exception {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
}
- protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ protected PagingStoreFactory getPagingStoreFactory() throws Exception {
+ if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
+ DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
+ return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO);
+ }
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 4ba268f..c23a2ea 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1719,6 +1719,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
+ <xsd:element name="page-store-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ The table name used to large message files
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
</xsd:all>
</xsd:complexType>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index e6d68b1..2f12b05 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -455,12 +455,13 @@ public abstract class ActiveMQTestBase extends Assert {
return configuration;
}
- private void setDBStoreType(Configuration configuration) {
+ protected void setDBStoreType(Configuration configuration) {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGE");
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
+ dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
configuration.setStoreConfiguration(dbStorageConfiguration);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/artemis-server/src/test/resources/database-store-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/database-store-config.xml b/artemis-server/src/test/resources/database-store-config.xml
index 1fa3bd6..69f9da7 100644
--- a/artemis-server/src/test/resources/database-store-config.xml
+++ b/artemis-server/src/test/resources/database-store-config.xml
@@ -25,6 +25,7 @@
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
+ <page-store-table-name>PAGE_STORE_TABLE</page-store-table-name>
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
</database-store>
</store>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index c94f54a..3960b49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
@@ -36,9 +37,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class GlobalPagingTest extends PagingTest {
+ public GlobalPagingTest(StoreConfiguration.StoreType storeType) {
+ super(storeType);
+ }
+
@Override
@Before
public void setUp() throws Exception {
@@ -69,6 +77,8 @@ public class GlobalPagingTest extends PagingTest {
@Test
public void testPagingOverFullDisk() throws Exception {
+ if (storeType == StoreConfiguration.StoreType.DATABASE) return;
+
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/118c272c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 3daaa0e..bc614ea 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -24,6 +24,8 @@ import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -51,6 +53,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
@@ -86,7 +89,10 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class PagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(PagingTest.class);
@@ -104,8 +110,19 @@ public class PagingTest extends ActiveMQTestBase {
protected static final int PAGE_SIZE = 10 * 1024;
+ protected final StoreConfiguration.StoreType storeType;
+
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+ public PagingTest(StoreConfiguration.StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ @Parameterized.Parameters(name = "storeType={0}")
+ public static Collection<Object[]> data() {
+ Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
+ return Arrays.asList(params);
+ }
@Before
public void checkLoggerStart() throws Exception {
@@ -123,8 +140,6 @@ public class PagingTest extends ActiveMQTestBase {
}
}
-
-
@Override
@Before
public void setUp() throws Exception {
@@ -1446,6 +1461,8 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testMissingTXEverythingAcked() throws Exception {
+ if (storeType == StoreConfiguration.StoreType.DATABASE) return;
+
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -5633,7 +5650,11 @@ public class PagingTest extends ActiveMQTestBase {
@Override
protected Configuration createDefaultInVMConfig() throws Exception {
- return super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+ Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+ if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ setDBStoreType(configuration);
+ }
+ return configuration;
}
private static final class DummyOperationContext implements OperationContext {