You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/01 21:33:16 UTC

[1/5] activemq-artemis git commit: Add Derby dep to Stress Tests pom

Repository: activemq-artemis
Updated Branches:
  refs/heads/master ae7300142 -> e64ea5278


Add Derby dep to Stress Tests pom


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fda6789d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fda6789d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fda6789d

Branch: refs/heads/master
Commit: fda6789d96ce2ffecfb1bc11662d70b4863b931d
Parents: 79904ae
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue May 31 12:47:16 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100

----------------------------------------------------------------------
 tests/stress-tests/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fda6789d/tests/stress-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/stress-tests/pom.xml b/tests/stress-tests/pom.xml
index 4dc7988..5373434 100644
--- a/tests/stress-tests/pom.xml
+++ b/tests/stress-tests/pom.xml
@@ -111,6 +111,12 @@
          <groupId>org.apache.geronimo.specs</groupId>
          <artifactId>geronimo-jms_2.0_spec</artifactId>
       </dependency>
+      <dependency>
+         <groupId>org.apache.derby</groupId>
+         <artifactId>derby</artifactId>
+         <version>${apache.derby.version}</version>
+         <scope>test</scope>
+      </dependency>
 
       <!-- this is for the log assertion -->
       <dependency>


[3/5] activemq-artemis git commit: Refactored JDBC Sequential File Factory

Posted by cl...@apache.org.
Refactored JDBC Sequential File Factory


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/466d43c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/466d43c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/466d43c6

Branch: refs/heads/master
Commit: 466d43c63dc3b61e2be41721f799a15afbf02064
Parents: ae73001
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue May 31 12:15:52 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100

----------------------------------------------------------------------
 .../activemq/artemis/jdbc/store/JDBCUtils.java  |  24 +-
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  88 +++++
 .../store/drivers/derby/DerbySQLProvider.java   |  59 ++++
 .../jdbc/store/file/JDBCFileFactoryDriver.java  | 344 +++++++++++++++++++
 .../jdbc/store/file/JDBCSequentialFile.java     | 172 ++++------
 .../store/file/JDBCSequentialFileFactory.java   |  97 ++----
 .../jdbc/store/file/sql/DerbySQLProvider.java   |  52 ---
 .../jdbc/store/file/sql/GenericSQLProvider.java | 143 --------
 .../jdbc/store/file/sql/SQLProvider.java        |  46 ---
 .../jdbc/store/journal/JDBCJournalImpl.java     | 132 +++----
 .../jdbc/store/sql/GenericSQLProvider.java      | 201 +++++++++++
 .../artemis/jdbc/store/sql/SQLProvider.java     |  60 ++++
 .../file/JDBCSequentialFileFactoryTest.java     |   9 +-
 .../impl/journal/JDBCJournalStorageManager.java |   6 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |  38 +-
 .../largemessage/LargeMessageTestBase.java      |   7 +
 .../RolesConfigurationStorageTest.java          |   1 -
 .../persistence/StorageManagerTestBase.java     |  12 +-
 .../integration/xa/BasicXaRecoveryTest.java     |   3 +
 19 files changed, 973 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index bc04ab9..a1bde56 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -23,9 +23,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
 public class JDBCUtils {
 
@@ -73,4 +74,21 @@ public class JDBCUtils {
          return new GenericSQLProvider(tableName);
       }
    }
+
+   public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName, String jdbcConnectionUrl) throws SQLException {
+      JDBCFileFactoryDriver dbDriver;
+      if (driverClass.contains("derby")) {
+         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
+         dbDriver.setConnectionURL(jdbcConnectionUrl);
+         dbDriver.setDriverClass(driverClass);
+      }
+      else {
+         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
+         dbDriver.setConnectionURL(jdbcConnectionUrl);
+         dbDriver.setDriverClass(driverClass);
+      }
+      return dbDriver;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
new file mode 100644
index 0000000..1c282c0
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+/**
+ * Class to hold common database functionality such as drivers and connections
+ */
+public abstract class AbstractJDBCDriver {
+
+   protected Connection connection;
+
+   protected final SQLProvider sqlProvider;
+
+   protected final String jdbcConnectionUrl;
+
+   protected final String jdbcDriverClass;
+
+   protected Driver dbDriver;
+
+   public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
+      this.jdbcConnectionUrl = jdbcConnectionUrl;
+      this.jdbcDriverClass = jdbcDriverClass;
+      this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName);
+   }
+
+   public void start() throws Exception {
+      connect();
+      createSchema();
+      prepareStatements();
+   }
+
+   public void stop() throws Exception {
+      if (sqlProvider.closeConnectionOnShutdown()) {
+         connection.close();
+      }
+   }
+
+   protected abstract void prepareStatements() throws SQLException;
+
+   protected abstract void createSchema() throws SQLException;
+
+   protected void createTable(String schemaSql) throws SQLException {
+      JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
+   }
+
+   protected void connect() throws Exception {
+      try {
+         dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
+         connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
+      }
+      catch (SQLException e) {
+         ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
+         throw new RuntimeException("Error connecting to database", e);
+      }
+   }
+
+   public void destroy() throws Exception {
+      connection.setAutoCommit(false);
+      Statement statement = connection.createStatement();
+      statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
+      statement.close();
+      connection.commit();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
new file mode 100644
index 0000000..d9cbed4
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.derby;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class DerbySQLProvider extends GenericSQLProvider {
+
+   // Derby max blob size = 2G
+   private static final int MAX_BLOB_SIZE = 2147483647;
+
+   private final String createFileTableSQL;
+
+   private final String appendToFileSQL;
+
+   public DerbySQLProvider(String tableName) {
+      super(tableName);
+
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
+         "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+
+      appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getAppendToLargeObjectSQL() {
+      return appendToFileSQL;
+   }
+
+   @Override
+   public boolean closeConnectionOnShutdown() {
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
new file mode 100644
index 0000000..04af009
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+
+public class JDBCFileFactoryDriver {
+
+   protected Connection connection;
+
+   protected SQLProvider sqlProvider;
+
+   protected PreparedStatement deleteFile;
+
+   protected PreparedStatement createFile;
+
+   protected PreparedStatement selectFileByFileName;
+
+   protected PreparedStatement copyFileRecord;
+
+   protected PreparedStatement renameFile;
+
+   protected PreparedStatement readLargeObject;
+
+   protected PreparedStatement appendToLargeObject;
+
+   protected PreparedStatement selectFileNamesByExtension;
+
+   protected String connectionUrl;
+
+   protected String driverClass;
+
+   public JDBCFileFactoryDriver() {
+   }
+
+   public void setConnectionURL(String connectionUrl) {
+      this.connectionUrl = connectionUrl;
+   }
+
+   public void setSqlProvider(SQLProvider sqlProvider) {
+      this.sqlProvider = sqlProvider;
+   }
+
+   public void setDriverClass(String driverClass) {
+      this.driverClass = driverClass;
+   }
+
+   public void start() throws Exception {
+      Driver driver = JDBCUtils.getDriver(driverClass);
+      connection = driver.connect(connectionUrl, new Properties());
+      JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
+      prepareStatements();
+   }
+
+   public void stop() throws SQLException {
+      if (sqlProvider.closeConnectionOnShutdown())
+         connection.close();
+   }
+
+   protected void prepareStatements() throws SQLException {
+      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
+      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
+      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   }
+
+   public synchronized List<String> listFiles(String extension) throws Exception {
+      List<String> fileNames = new ArrayList<>();
+      try {
+         connection.setAutoCommit(false);
+         selectFileNamesByExtension.setString(1, extension);
+         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+            while (rs.next()) {
+               fileNames.add(rs.getString(1));
+            }
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+      return fileNames;
+   }
+
+   /**
+    * Opens the supplied file.  If the file does not exist in the database it will create a new one.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public void openFile(JDBCSequentialFile file) throws SQLException {
+      int fileId = fileExists(file);
+      if (fileId < 0) {
+         createFile(file);
+      }
+      else {
+         file.setId(fileId);
+         loadFile(file);
+      }
+   }
+
+   /**
+    * Checks to see if a file with filename and extension exists.  If so returns the ID of the file or returns -1.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      selectFileByFileName.setString(1, file.getFileName());
+      try (ResultSet rs = selectFileByFileName.executeQuery()) {
+         int id = rs.next() ? rs.getInt(1) : -1;
+         connection.commit();
+         return id;
+      }
+      catch (Exception e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Loads an existing file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            file.setWritePosition((int) rs.getBlob(1).length());
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Creates a new database row representing the supplied file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         createFile.setString(1, file.getFileName());
+         createFile.setString(2, file.getExtension());
+         createFile.setBytes(3, new byte[0]);
+         createFile.executeUpdate();
+         try (ResultSet keys = createFile.getGeneratedKeys()) {
+            keys.next();
+            file.setId(keys.getInt(1));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Updates the fileName field to the new value.
+    *
+    * @param file
+    * @param newFileName
+    * @throws SQLException
+    */
+   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         renameFile.setString(1, newFileName);
+         renameFile.setInt(2, file.getId());
+         renameFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Deletes the associated row in the database.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         deleteFile.setInt(1, file.getId());
+         deleteFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Persists data to this files associated database mapping.
+    *
+    * @param file
+    * @param data
+    * @return
+    * @throws Exception
+    */
+   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         appendToLargeObject.setBytes(1, data);
+         appendToLargeObject.setInt(2, file.getId());
+         appendToLargeObject.executeUpdate();
+         connection.commit();
+         return data.length;
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Reads data from the file (at file.readPosition) into the byteBuffer.
+    *
+    * @param file
+    * @param bytes
+    * @return
+    * @throws Exception
+    */
+   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+      int readLength = 0;
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            Blob blob = rs.getBlob(1);
+            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+            byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
+            bytes.put(data);
+         }
+         connection.commit();
+         return readLength;
+      }
+      catch (Throwable e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Copy the data content of FileFrom to FileTo
+    *
+    * @param fileFrom
+    * @param fileTo
+    * @throws SQLException
+    */
+   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         copyFileRecord.setInt(1, fileFrom.getId());
+         copyFileRecord.setInt(2, fileTo.getId());
+         copyFileRecord.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Drop all tables and data
+    */
+   public synchronized void destroy() throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         Statement statement = connection.createStatement();
+         statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
+      long bytesRemaining = objectLength - readPosition;
+      if (bytesRemaining > bufferSpace) {
+         return bufferSpace;
+      }
+      else {
+         return bytesRemaining;
+      }
+   }
+
+   public int getMaxSize() {
+      return sqlProvider.getMaxBlobSize();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 73bec72..6b91223 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -19,24 +19,20 @@ package org.apache.activemq.artemis.jdbc.store.file;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.jboss.logging.Logger;
 
 public class JDBCSequentialFile implements SequentialFile {
@@ -53,20 +49,6 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private int id = -1;
 
-   private final PreparedStatement appendToFile;
-
-   private final PreparedStatement deleteFile;
-
-   private final PreparedStatement readFile;
-
-   private final PreparedStatement createFile;
-
-   private final PreparedStatement selectFileByFileName;
-
-   private final PreparedStatement copyFileRecord;
-
-   private final PreparedStatement renameFile;
-
    private long readPosition = 0;
 
    private long writePosition = 0;
@@ -75,33 +57,30 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private JDBCSequentialFileFactory fileFactory;
 
-   private int maxSize;
+   private final Object writeLock;
 
-   private SQLProvider sqlProvider;
+   private final JDBCFileFactoryDriver dbDriver;
 
-   private final Object writeLock;
+   private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
+
+   // Allows DB Drivers to cache meta data.
+   private Map<Object, Object> metaData = new ConcurrentHashMap<>();
 
    public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
                              final String filename,
-                             final SQLProvider sqlProvider,
                              final Executor executor,
+                             final JDBCFileFactoryDriver driver,
                              final Object writeLock) throws SQLException {
       this.fileFactory = fileFactory;
       this.filename = filename;
       this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
       this.executor = executor;
-      this.maxSize = sqlProvider.getMaxBlobSize();
-      this.sqlProvider = sqlProvider;
       this.writeLock = writeLock;
+      this.dbDriver = driver;
+   }
 
-      Connection connection = fileFactory.getConnection();
-      this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL());
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
-      this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+   public void setWritePosition(int writePosition) {
+      this.writePosition = writePosition;
    }
 
    @Override
@@ -117,35 +96,11 @@ public class JDBCSequentialFile implements SequentialFile {
    @Override
    public synchronized void open() throws Exception {
       if (!isOpen) {
-         try {
-            synchronized (writeLock) {
-               selectFileByFileName.setString(1, filename);
-
-               try (ResultSet rs = selectFileByFileName.executeQuery()) {
-                  if (!rs.next()) {
-                     createFile.setString(1, filename);
-                     createFile.setString(2, extension);
-                     createFile.setBytes(3, new byte[0]);
-                     createFile.executeUpdate();
-                     try (ResultSet keys = createFile.getGeneratedKeys()) {
-                        keys.next();
-                        this.id = keys.getInt(1);
-                     }
-                  }
-                  else {
-                     this.id = rs.getInt(1);
-                     this.writePosition = rs.getBlob(4).length();
-                  }
-               }
-            }
-         }
-         catch (SQLException e) {
-            ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
-            isOpen = false;
+         synchronized (writeLock) {
+            dbDriver.openFile(this);
+            isCreated = true;
+            isOpen = true;
          }
-
-         isCreated = true;
-         isOpen = true;
       }
    }
 
@@ -156,7 +111,7 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public boolean fits(int size) {
-      return writePosition + size <= maxSize;
+      return writePosition + size <= dbDriver.getMaxSize();
    }
 
    @Override
@@ -183,24 +138,20 @@ public class JDBCSequentialFile implements SequentialFile {
    public void delete() throws IOException, InterruptedException, ActiveMQException {
       try {
          if (isCreated) {
-            deleteFile.setInt(1, id);
-            deleteFile.executeUpdate();
+            synchronized (writeLock) {
+               dbDriver.deleteFile(this);
+            }
          }
       }
       catch (SQLException e) {
-         throw new IOException(e);
+         throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
       }
    }
 
    private synchronized int internalWrite(byte[] data, IOCallback callback) {
       try {
          synchronized (writeLock) {
-            int noBytes = data.length;
-            appendToFile.setBytes(1, data);
-            appendToFile.setInt(2, id);
-            int result = appendToFile.executeUpdate();
-            if (result < 1)
-               throw new ActiveMQException("No record found for file id: " + id);
+            int noBytes = dbDriver.writeToFile(this, data);
             seek(noBytes);
             if (callback != null)
                callback.done();
@@ -295,36 +246,19 @@ public class JDBCSequentialFile implements SequentialFile {
    }
 
    @Override
-   public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException {
+   public synchronized int read(ByteBuffer bytes, final IOCallback callback) throws SQLException {
       synchronized (writeLock) {
-         readFile.setInt(1, id);
-         try (ResultSet rs = readFile.executeQuery()) {
-            if (rs.next()) {
-               Blob blob = rs.getBlob(1);
-
-               long bytesRemaining = blob.length() - readPosition;
-               byte[] data;
-               if (bytesRemaining > bytes.remaining()) {
-                  // First index into blob is 1 (not 0)
-                  data = blob.getBytes(readPosition + 1, bytes.remaining());
-               }
-               else {
-                  // First index into blob is 1 (not 0)
-                  data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
-               }
-
-               bytes.put(data);
-               readPosition += data.length;
-               if (callback != null)
-                  callback.done();
-
-               return data.length;
-            }
-            return 0;
+         try {
+            int read = dbDriver.readFromFile(this, bytes);
+            readPosition += read;
+            if (callback != null)
+               callback.done();
+            return read;
          }
          catch (Exception e) {
             if (callback != null)
                callback.onError(-1, e.getMessage());
+            e.printStackTrace();
             return 0;
          }
       }
@@ -352,8 +286,20 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public void sync() throws IOException {
-      // (mtaylor) We always write straight away, so we don't need to do anything here.
-      // (mtaylor) Is this meant to be blocking?
+      final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+      executor.execute(new Runnable() {
+         @Override
+         public void run() {
+            callback.done();
+         }
+      });
+
+      try {
+         callback.waitCompletion();
+      }
+      catch (Exception e) {
+         throw new IOException(e);
+      }
    }
 
    @Override
@@ -363,15 +309,15 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public void renameTo(String newFileName) throws Exception {
-      renameFile.setString(1, newFileName);
-      renameFile.setInt(2, id);
-      renameFile.executeUpdate();
+      synchronized (writeLock) {
+         dbDriver.renameFile(this, newFileName);
+      }
    }
 
    @Override
    public SequentialFile cloneFile() {
       try {
-         JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock);
+         JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
          return clone;
       }
       catch (Exception e) {
@@ -385,9 +331,9 @@ public class JDBCSequentialFile implements SequentialFile {
       JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
       clone.open();
 
-      copyFileRecord.setInt(1, id);
-      copyFileRecord.setInt(2, clone.getId());
-      copyFileRecord.executeUpdate();
+      synchronized (writeLock) {
+         dbDriver.copyFileData(this, clone);
+      }
    }
 
    public int getId() {
@@ -416,4 +362,16 @@ public class JDBCSequentialFile implements SequentialFile {
    public File getJavaFile() {
       return null;
    }
+
+   public void addMetaData(Object key, Object value) {
+      metaData.put(key, value);
+   }
+
+   public Object removeMetaData(Object key) {
+      return metaData.remove(key);
+   }
+
+   public Object getMetaData(Object key) {
+      return metaData.get(key);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 4231907..07e30a9 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -18,17 +18,11 @@ package org.apache.activemq.artemis.jdbc.store.file;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -36,53 +30,59 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
-import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
 
-   private Connection connection;
-
-   private String connectionUrl;
-
-   private final Driver driver;
-
    private boolean started;
 
-   private final String tableName;
-
    private List<JDBCSequentialFile> files;
 
-   private PreparedStatement selectFileNamesByExtension;
-
    private Executor executor;
 
-   private SQLProvider sqlProvider;
-
    private Map<String, Object> fileLocks = new HashMap<>();
 
+   private final JDBCFileFactoryDriver dbDriver;
+
    public JDBCSequentialFileFactory(final String connectionUrl,
                                     final String tableName,
                                     final String className,
                                     Executor executor) throws Exception {
-      this.connectionUrl = connectionUrl;
       this.executor = executor;
-      this.tableName = tableName.toUpperCase();
-
       files = new ArrayList<>();
-      sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName);
-      driver = JDBCUtils.getDriver(className);
+      dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl);
    }
 
-   public Connection getConnection() {
-      return connection;
+   @Override
+   public synchronized void start() {
+      try {
+         if (!started) {
+            dbDriver.start();
+            started = true;
+         }
+      }
+      catch (Exception e) {
+         ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
+         started = false;
+      }
+   }
+
+   @Override
+   public synchronized void stop() {
+      try {
+         dbDriver.stop();
+      }
+      catch (SQLException e) {
+         ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
+      }
+      started = false;
    }
 
    @Override
    public SequentialFile createSequentialFile(String fileName) {
       try {
          fileLocks.putIfAbsent(fileName, new Object());
-         JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName));
+         JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName));
          files.add(file);
          return file;
       }
@@ -99,15 +99,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
 
    @Override
    public List<String> listFiles(String extension) throws Exception {
-      List<String> fileNames = new ArrayList<>();
-
-      selectFileNamesByExtension.setString(1, extension);
-      try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
-         while (rs.next()) {
-            fileNames.add(rs.getString(1));
-         }
-      }
-      return fileNames;
+      return dbDriver.listFiles(extension);
    }
 
    @Override
@@ -171,7 +163,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
 
    @Override
    public void activateBuffer(SequentialFile file) {
-
    }
 
    @Override
@@ -180,34 +171,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
    }
 
    @Override
-   public synchronized void start() {
-      try {
-         if (!started) {
-            connection = driver.connect(connectionUrl, new Properties());
-            JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
-            selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
-            started = true;
-         }
-      }
-      catch (SQLException e) {
-         ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
-         started = false;
-      }
-   }
-
-   @Override
-   public synchronized void stop() {
-      try {
-         if (false)
-            connection.close();
-      }
-      catch (SQLException e) {
-         ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
-      }
-      started = false;
-   }
-
-   @Override
    public boolean isStarted() {
       return started;
    }
@@ -218,12 +181,8 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
 
    @Override
    public void flush() {
-
    }
 
    public synchronized void destroy() throws SQLException {
-      Statement statement = connection.createStatement();
-      statement.executeUpdate(sqlProvider.getDropFileTableSQL());
-      stop();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
deleted file mode 100644
index c14036e..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public class DerbySQLProvider extends GenericSQLProvider {
-
-   // Derby max blob size = 2G
-   private static final int MAX_BLOB_SIZE = 2147483647;
-
-   private final String createFileTableSQL;
-
-   private final String appendToFileSQL;
-
-   public DerbySQLProvider(String tableName) {
-      super(tableName);
-
-      createFileTableSQL = "CREATE TABLE " + tableName +
-         "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
-         "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
-
-      appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
-   }
-
-   @Override
-   public int getMaxBlobSize() {
-      return MAX_BLOB_SIZE;
-   }
-
-   @Override
-   public String getCreateFileTableSQL() {
-      return createFileTableSQL;
-   }
-
-   @Override
-   public String getAppendToFileSQL() {
-      return appendToFileSQL;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
deleted file mode 100644
index c95edb3..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public class GenericSQLProvider implements SQLProvider {
-
-   // Default to lowest (MYSQL = 64k)
-   private static final int MAX_BLOB_SIZE = 64512;
-
-   private final String tableName;
-
-   private final String createFileTableSQL;
-
-   private final String insertFileSQL;
-
-   private final String selectFileNamesByExtensionSQL;
-
-   private final String selectIdByFileNameSQL;
-
-   private final String appendToFileSQL;
-
-   private final String readFileSQL;
-
-   private final String deleteFileSQL;
-
-   private final String updateFileNameByIdSQL;
-
-   private final String copyFileRecordByIdSQL;
-
-   private final String cloneFileRecordSQL;
-
-   private final String dropFileTableSQL;
-
-   public GenericSQLProvider(String tableName) {
-      this.tableName = tableName;
-
-      createFileTableSQL = "CREATE TABLE " + tableName +
-         "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
-
-      insertFileSQL = "INSERT INTO " + tableName +
-         " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
-
-      selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
-
-      selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
-
-      appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
-
-      readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
-
-      deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
-
-      updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
-
-      cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
-         "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
-
-      copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
-
-      dropFileTableSQL = "DROP TABLE " + tableName;
-   }
-
-   @Override
-   public int getMaxBlobSize() {
-      return MAX_BLOB_SIZE;
-   }
-
-   @Override
-   public String getTableName() {
-      return tableName;
-   }
-
-   @Override
-   public String getCreateFileTableSQL() {
-      return createFileTableSQL;
-   }
-
-   @Override
-   public String getInsertFileSQL() {
-      return insertFileSQL;
-   }
-
-   @Override
-   public String getSelectFileByFileName() {
-      return selectIdByFileNameSQL;
-   }
-
-   @Override
-   public String getSelectFileNamesByExtensionSQL() {
-      return selectFileNamesByExtensionSQL;
-   }
-
-   @Override
-   public String getAppendToFileSQL() {
-      return appendToFileSQL;
-   }
-
-   @Override
-   public String getReadFileSQL() {
-      return readFileSQL;
-   }
-
-   @Override
-   public String getDeleteFileSQL() {
-      return deleteFileSQL;
-   }
-
-   @Override
-   public String getUpdateFileNameByIdSQL() {
-      return updateFileNameByIdSQL;
-   }
-
-   @Override
-   public String getCopyFileRecordByIdSQL() {
-      return copyFileRecordByIdSQL;
-   }
-
-   @Override
-   public String getCloneFileRecordByIdSQL() {
-      return cloneFileRecordSQL;
-   }
-
-   @Override
-   public String getDropFileTableSQL() {
-      return dropFileTableSQL;
-   }
-
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
deleted file mode 100644
index e9fe36c..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file.sql;
-
-public interface SQLProvider {
-
-   int getMaxBlobSize();
-
-   String getTableName();
-
-   String getCreateFileTableSQL();
-
-   String getInsertFileSQL();
-
-   String getSelectFileNamesByExtensionSQL();
-
-   String getSelectFileByFileName();
-
-   String getAppendToFileSQL();
-
-   String getReadFileSQL();
-
-   String getDeleteFileSQL();
-
-   String getUpdateFileNameByIdSQL();
-
-   String getCopyFileRecordByIdSQL();
-
-   String getDropFileTableSQL();
-
-   String getCloneFileRecordByIdSQL();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index f253167..1e6f393 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -17,21 +17,15 @@
 
 package org.apache.activemq.artemis.jdbc.store.journal;
 
-import java.sql.Connection;
-import java.sql.Driver;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -44,23 +38,18 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
-import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
 
-public class JDBCJournalImpl implements Journal {
+public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
    // Sync Delay in ms
    public static final int SYNC_DELAY = 5;
 
    private static int USER_VERSION = 1;
 
-   private final String tableName;
-
-   private final String jdbcDriverClass;
-
-   private Connection connection;
-
-   private List<JDBCJournalRecord> records;
+   private final List<JDBCJournalRecord> records;
 
    private PreparedStatement insertJournalRecords;
 
@@ -74,13 +63,9 @@ public class JDBCJournalImpl implements Journal {
 
    private boolean started;
 
-   private String jdbcUrl;
-
    private Timer syncTimer;
 
-   private Driver dbDriver;
-
-   private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+   private final Object journalLock = new Object();
 
    private final String timerThread;
 
@@ -90,68 +75,48 @@ public class JDBCJournalImpl implements Journal {
    // Sequence ID for journal records
    private AtomicLong seq = new AtomicLong(0);
 
+   private Logger logger = Logger.getLogger(this.getClass());
+
    public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) {
-      this.tableName = tableName;
-      this.jdbcUrl = jdbcUrl;
-      this.jdbcDriverClass = jdbcDriverClass;
+      super(tableName, jdbcUrl, jdbcDriverClass);
       timerThread = "Timer JDBC Journal(" + tableName + ")";
-
       records = new ArrayList<>();
    }
 
    @Override
    public void start() throws Exception {
-      dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
-
-      try {
-         connection = dbDriver.connect(jdbcUrl, new Properties());
-      }
-      catch (SQLException e) {
-         ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcUrl);
-         throw new RuntimeException("Error connecting to database", e);
-      }
-
-      JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName));
-
-      insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
-      selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
-      countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
-      deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
-      deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName));
-
+      super.start();
       syncTimer = new Timer(timerThread, true);
       syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
-
       started = true;
    }
 
-   @Override
-   public void stop() throws Exception {
-      stop(true);
+   protected void createSchema() throws SQLException {
+      createTable(sqlProvider.getCreateJournalTableSQL());
    }
 
-   public synchronized void stop(boolean shutdownConnection) throws Exception {
-      if (started) {
-         journalLock.writeLock().lock();
-
-         syncTimer.cancel();
+   protected void prepareStatements() throws SQLException {
+      insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
+      selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
+      countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL());
+      deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL());
+      deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL());
+   }
 
-         sync();
-         if (shutdownConnection) {
-            connection.close();
+   @Override
+   public synchronized void stop() throws Exception {
+      if (started) {
+         synchronized (journalLock) {
+            syncTimer.cancel();
+            sync();
+            started = false;
+            super.stop();
          }
-
-         started = false;
-         journalLock.writeLock().unlock();
       }
    }
 
    public synchronized void destroy() throws Exception {
-      connection.setAutoCommit(false);
-      Statement statement = connection.createStatement();
-      statement.executeUpdate("DROP TABLE " + tableName);
-      statement.close();
-      connection.commit();
+      super.destroy();
       stop();
    }
 
@@ -159,8 +124,11 @@ public class JDBCJournalImpl implements Journal {
       if (!started)
          return 0;
 
-      List<JDBCJournalRecord> recordRef = records;
-      records = new ArrayList<JDBCJournalRecord>();
+      List<JDBCJournalRecord> recordRef = new ArrayList<>();
+      synchronized (records) {
+         recordRef.addAll(records);
+         records.clear();
+      }
 
       // We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
       List<Long> deletedRecords = new ArrayList<>();
@@ -215,12 +183,18 @@ public class JDBCJournalImpl implements Journal {
          deleteJournalTxRecords.executeBatch();
 
          connection.commit();
-
-         cleanupTxRecords(deletedRecords, committedTransactions);
          success = true;
       }
       catch (SQLException e) {
-         performRollback(connection, recordRef);
+         performRollback(recordRef);
+      }
+
+      try {
+         if (success)
+            cleanupTxRecords(deletedRecords, committedTransactions);
+      }
+      catch (SQLException e) {
+         e.printStackTrace();
       }
 
       executeCallbacks(recordRef, success);
@@ -230,12 +204,11 @@ public class JDBCJournalImpl implements Journal {
    /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
       we remove the Tx Records (i.e. PREPARE, COMMIT). */
    private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
-
+      connection.rollback();
       List<RecordInfo> iterableCopy;
       List<TransactionHolder> iterableCopyTx = new ArrayList<>();
       iterableCopyTx.addAll(transactions.values());
 
-
       for (Long txId : committedTx) {
          transactions.get(txId).committed = true;
       }
@@ -260,9 +233,8 @@ public class JDBCJournalImpl implements Journal {
       }
    }
 
-   private void performRollback(Connection connection, List<JDBCJournalRecord> records) {
+   private void performRollback(List<JDBCJournalRecord> records) {
       try {
-         connection.rollback();
          for (JDBCJournalRecord record : records) {
             if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
                removeTxRecord(record);
@@ -306,18 +278,18 @@ public class JDBCJournalImpl implements Journal {
          record.setIoCompletion(callback);
       }
 
-      try {
-         journalLock.writeLock().lock();
+      synchronized (journalLock) {
          if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
             addTxRecord(record);
          }
-         records.add(record);
-      }
-      finally {
-         journalLock.writeLock().unlock();
+
+         synchronized (records) {
+            records.add(record);
+         }
       }
 
-      if (callback != null) callback.waitCompletion();
+      if (callback != null)
+         callback.waitCompletion();
    }
 
    private synchronized void addTxRecord(JDBCJournalRecord record) {
@@ -703,12 +675,12 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public final void synchronizationLock() {
-      journalLock.writeLock().lock();
+      logger.error("Replication is not supported with JDBC Store");
    }
 
    @Override
    public final void synchronizationUnlock() {
-      journalLock.writeLock().unlock();
+      logger.error("Replication is not supported with JDBC Store");
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
new file mode 100644
index 0000000..6efa170
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.sql;
+
+public class GenericSQLProvider implements SQLProvider {
+
+   // Default to lowest (MYSQL = 64k)
+   private static final int MAX_BLOB_SIZE = 64512;
+
+   protected final String tableName;
+
+   private final String createFileTableSQL;
+
+   private final String insertFileSQL;
+
+   private final String selectFileNamesByExtensionSQL;
+
+   private final String selectIdByFileNameSQL;
+
+   private final String appendToFileSQL;
+
+   private final String readLargeObjectSQL;
+
+   private final String deleteFileSQL;
+
+   private final String updateFileNameByIdSQL;
+
+   private final String copyFileRecordByIdSQL;
+
+   private final String cloneFileRecordSQL;
+
+   private final String dropFileTableSQL;
+
+   private final String createJournalTableSQL;
+
+   private final String insertJournalRecordsSQL;
+
+   private final String selectJournalRecordsSQL;
+
+   private final String deleteJournalRecordsSQL;
+
+   private final String deleteJournalTxRecordsSQL;
+
+   private final String countJournalRecordsSQL;
+
+   public GenericSQLProvider(String tableName) {
+      this.tableName = tableName;
+
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+
+      insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
+
+      selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
+
+      selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
+
+      appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
+
+      readLargeObjectSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
+
+      deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
+
+      updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
+
+      cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
+         "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
+
+      copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
+
+      dropFileTableSQL = "DROP TABLE " + tableName;
+
+      createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
+
+      insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
+
+      selectJournalRecordsSQL = "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
+
+      deleteJournalRecordsSQL = "DELETE FROM " + tableName + " WHERE id = ?";
+
+      deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
+
+      countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+
+   @Override
+   public String getTableName() {
+      return tableName;
+   }
+
+   // Journal SQL Statements
+   @Override
+   public String getCreateJournalTableSQL() {
+      return createJournalTableSQL;
+   }
+
+   @Override
+   public String getInsertJournalRecordsSQL() {
+      return insertJournalRecordsSQL;
+   }
+
+   @Override
+   public String getSelectJournalRecordsSQL() {
+      return selectJournalRecordsSQL;
+   }
+
+   @Override
+   public String getDeleteJournalRecordsSQL() {
+      return deleteJournalRecordsSQL;
+   }
+
+   @Override
+   public String getDeleteJournalTxRecordsSQL() {
+      return deleteJournalTxRecordsSQL;
+   }
+
+   @Override
+   public String getCountJournalRecordsSQL() {
+      return countJournalRecordsSQL;
+   }
+
+   // Large Message Statements
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getInsertFileSQL() {
+      return insertFileSQL;
+   }
+
+   @Override
+   public String getSelectFileByFileName() {
+      return selectIdByFileNameSQL;
+   }
+
+   @Override
+   public String getSelectFileNamesByExtensionSQL() {
+      return selectFileNamesByExtensionSQL;
+   }
+
+   @Override
+   public String getAppendToLargeObjectSQL() {
+      return appendToFileSQL;
+   }
+
+   @Override
+   public String getReadLargeObjectSQL() {
+      return readLargeObjectSQL;
+   }
+
+   @Override
+   public String getDeleteFileSQL() {
+      return deleteFileSQL;
+   }
+
+   @Override
+   public String getUpdateFileNameByIdSQL() {
+      return updateFileNameByIdSQL;
+   }
+
+   @Override
+   public String getCopyFileRecordByIdSQL() {
+      return copyFileRecordByIdSQL;
+   }
+
+   @Override
+   public String getCloneFileRecordByIdSQL() {
+      return cloneFileRecordSQL;
+   }
+
+   @Override
+   public String getDropFileTableSQL() {
+      return dropFileTableSQL;
+   }
+
+   @Override
+   public boolean closeConnectionOnShutdown() {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
new file mode 100644
index 0000000..5645ebc
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.sql;
+
+public interface SQLProvider {
+
+   int getMaxBlobSize();
+
+   String getCreateJournalTableSQL();
+
+   String getInsertJournalRecordsSQL();
+
+   String getSelectJournalRecordsSQL();
+
+   String getDeleteJournalRecordsSQL();
+
+   String getDeleteJournalTxRecordsSQL();
+
+   String getTableName();
+
+   String getCreateFileTableSQL();
+
+   String getInsertFileSQL();
+
+   String getSelectFileNamesByExtensionSQL();
+
+   String getSelectFileByFileName();
+
+   String getAppendToLargeObjectSQL();
+
+   String getReadLargeObjectSQL();
+
+   String getDeleteFileSQL();
+
+   String getUpdateFileNameByIdSQL();
+
+   String getCopyFileRecordByIdSQL();
+
+   String getDropFileTableSQL();
+
+   String getCloneFileRecordByIdSQL();
+
+   String getCountJournalRecordsSQL();
+
+   boolean closeConnectionOnShutdown();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index bd200bd..1a8fba4 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -63,7 +63,7 @@ public class JDBCSequentialFileFactoryTest {
    }
 
    @After
-   public void tearDown() throws SQLException {
+   public void tearDown() throws Exception {
       factory.destroy();
    }
 
@@ -126,8 +126,8 @@ public class JDBCSequentialFileFactoryTest {
       JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
       file.copyTo(copy);
 
-      checkData(copy, src);
       checkData(file, src);
+      checkData(copy, src);
    }
 
    @Test
@@ -145,7 +145,12 @@ public class JDBCSequentialFileFactoryTest {
       IOCallbackCountdown callback = new IOCallbackCountdown(1);
       file.internalWrite(src, callback);
 
+      assertEquals(bufferSize, file.size());
       JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
+      copy.open();
+
+      assertEquals(bufferSize, copy.size());
+      assertEquals(bufferSize, file.size());
    }
 
    private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 25b7cd1..a5d55ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -88,10 +88,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
 
       beforeStop();
 
-      ((JDBCJournalImpl) bindingsJournal).stop(false);
-
-      ((JDBCJournalImpl) messageJournal).stop(false);
-
+      bindingsJournal.stop();
+      messageJournal.stop();
       largeMessagesFactory.stop();
 
       singleThreadExecutor.shutdown();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 86d8cf6..31ad1d6 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -40,6 +40,10 @@ import java.lang.management.ManagementFactory;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.net.ServerSocket;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -124,6 +128,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.jlibaio.LibaioContext;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@@ -448,12 +454,34 @@ public abstract class ActiveMQTestBase extends Assert {
       DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
       dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
       dbStorageConfiguration.setBindingsTableName("BINDINGS");
-      dbStorageConfiguration.setMessageTableName("MESSAGES");
-      dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
+      dbStorageConfiguration.setMessageTableName("MESSAGE");
+      dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
+      dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
 
       configuration.setStoreConfiguration(dbStorageConfiguration);
    }
 
+   public void destroyTables(List<String> tableNames) throws Exception {
+      Driver driver = JDBCUtils.getDriver(getJDBCClassName());
+      Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
+      Statement statement = connection.createStatement();
+      try {
+         for (String tableName : tableNames) {
+            SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName);
+            ResultSet rs = connection.getMetaData().getTables(null, null, sqlProvider.getTableName(), null);
+            if (rs.next()) {
+               statement.execute("DROP TABLE " + sqlProvider.getTableName());
+            }
+         }
+      }
+      catch (Throwable e) {
+         e.printStackTrace();
+      }
+      finally {
+         connection.close();
+      }
+   }
+
    protected Map<String, Object> generateInVMParams(final int node) {
       Map<String, Object> params = new HashMap<>();
 
@@ -797,7 +825,11 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected final String getTestJDBCConnectionUrl() {
-      return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true";
+      return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true");
+   }
+
+   protected final String getJDBCClassName() {
+      return System.getProperty("jdbc.driver.class","org.apache.derby.jdbc.EmbeddedDriver");
    }
 
    protected final File getTestDirfile() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
index 6664d1e..03ed5b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
@@ -79,6 +79,13 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
       this.storeType = storeType;
    }
 
+   public void tearDown() throws Exception {
+      super.tearDown();
+      if (storeType == StoreConfiguration.StoreType.DATABASE) {
+         destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
+      }
+   }
+
    @Parameterized.Parameters(name = "storeType={0}")
    public static Collection<Object[]> data() {
       Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
index 391cccf..4f4c5de 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java
@@ -40,7 +40,6 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-
       mapExpectedSets = new HashMap<>();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 75e5eb2..886cde3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.persistence;
 
-import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -105,15 +103,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
          jmsJournal = null;
       }
 
-      // Stops the database engine early to stop thread leaks showing.
-      if (storeType == StoreConfiguration.StoreType.DATABASE) {
-         try {
-            DriverManager.getConnection("jdbc:derby:;shutdown=true");
-         }
-         catch (SQLException e) {
-         }
-      }
-
+      destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
       super.tearDown();
       if (exception != null)
          throw exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/466d43c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
index 5394565..06a3f77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java
@@ -125,6 +125,9 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
    public void tearDown() throws Exception {
       MBeanServerFactory.releaseMBeanServer(mbeanServer);
       super.tearDown();
+      if (storeType == StoreConfiguration.StoreType.DATABASE) {
+         destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
+      }
    }
 
    @Test


[2/5] activemq-artemis git commit: Add PostGres Driver

Posted by cl...@apache.org.
Add PostGres Driver


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/79904aeb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/79904aeb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/79904aeb

Branch: refs/heads/master
Commit: 79904aeb64a5e5b7bc9aa8f2060d32327e9092ca
Parents: 466d43c
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue May 31 12:19:29 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100

----------------------------------------------------------------------
 artemis-jdbc-store/pom.xml                      |   6 +
 .../activemq/artemis/jdbc/store/JDBCUtils.java  |  31 +-
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  59 +++-
 .../drivers/postgres/PostgresSQLProvider.java   |  53 +++
 .../PostgresSequentialSequentialFileDriver.java | 169 +++++++++
 .../jdbc/store/file/JDBCFileFactoryDriver.java  | 344 -------------------
 .../jdbc/store/file/JDBCSequentialFile.java     |   6 +-
 .../store/file/JDBCSequentialFileFactory.java   |   3 +-
 .../file/JDBCSequentialFileFactoryDriver.java   | 323 +++++++++++++++++
 .../jdbc/store/journal/JDBCJournalImpl.java     |   3 +-
 pom.xml                                         |   8 +
 tests/integration-tests/pom.xml                 |  10 +-
 12 files changed, 646 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index 86fe1a6..bb54c12 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -53,6 +53,7 @@
          <scope>test</scope>
       </dependency>
 
+      <!-- Database driver support -->
       <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
@@ -60,6 +61,11 @@
       </dependency>
 
       <dependency>
+         <groupId>org.postgresql</groupId>
+         <artifactId>postgresql</artifactId>
+      </dependency>
+
+      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-journal</artifactId>
          <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index a1bde56..04ac242 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -24,7 +24,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
@@ -70,24 +72,35 @@ public class JDBCUtils {
       if (driverClass.contains("derby")) {
          return new DerbySQLProvider(tableName);
       }
+      else if (driverClass.contains("postgres")) {
+         return new PostgresSQLProvider(tableName);
+      }
       else {
          return new GenericSQLProvider(tableName);
       }
    }
 
-   public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName, String jdbcConnectionUrl) throws SQLException {
-      JDBCFileFactoryDriver dbDriver;
+   public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
+                                                                 String tableName,
+                                                                 String jdbcConnectionUrl) throws SQLException {
+      JDBCSequentialFileFactoryDriver dbDriver;
       if (driverClass.contains("derby")) {
-         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver = new JDBCSequentialFileFactoryDriver();
          dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
-         dbDriver.setConnectionURL(jdbcConnectionUrl);
-         dbDriver.setDriverClass(driverClass);
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
+      }
+      else if (driverClass.contains("postgres")) {
+         dbDriver = new PostgresSequentialSequentialFileDriver();
+         dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
       }
       else {
-         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver = new JDBCSequentialFileFactoryDriver();
          dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
-         dbDriver.setConnectionURL(jdbcConnectionUrl);
-         dbDriver.setDriverClass(driverClass);
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
       }
       return dbDriver;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index 1c282c0..6d8be71 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -33,14 +33,17 @@ public abstract class AbstractJDBCDriver {
 
    protected Connection connection;
 
-   protected final SQLProvider sqlProvider;
+   protected SQLProvider sqlProvider;
 
-   protected final String jdbcConnectionUrl;
+   protected String jdbcConnectionUrl;
 
-   protected final String jdbcDriverClass;
+   protected String jdbcDriverClass;
 
    protected Driver dbDriver;
 
+   public AbstractJDBCDriver() {
+   }
+
    public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
       this.jdbcConnectionUrl = jdbcConnectionUrl;
       this.jdbcDriverClass = jdbcDriverClass;
@@ -53,7 +56,7 @@ public abstract class AbstractJDBCDriver {
       prepareStatements();
    }
 
-   public void stop() throws Exception {
+   public void stop() throws SQLException {
       if (sqlProvider.closeConnectionOnShutdown()) {
          connection.close();
       }
@@ -79,10 +82,48 @@ public abstract class AbstractJDBCDriver {
    }
 
    public void destroy() throws Exception {
-      connection.setAutoCommit(false);
-      Statement statement = connection.createStatement();
-      statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
-      statement.close();
-      connection.commit();
+      try {
+         connection.setAutoCommit(false);
+         Statement statement = connection.createStatement();
+         statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
+         statement.close();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   public Connection getConnection() {
+      return connection;
+   }
+
+   public void setConnection(Connection connection) {
+      this.connection = connection;
+   }
+
+   public SQLProvider getSqlProvider() {
+      return sqlProvider;
+   }
+
+   public void setSqlProvider(SQLProvider sqlProvider) {
+      this.sqlProvider = sqlProvider;
+   }
+
+   public String getJdbcConnectionUrl() {
+      return jdbcConnectionUrl;
+   }
+
+   public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
+      this.jdbcConnectionUrl = jdbcConnectionUrl;
+   }
+
+   public String getJdbcDriverClass() {
+      return jdbcDriverClass;
+   }
+
+   public void setJdbcDriverClass(String jdbcDriverClass) {
+      this.jdbcDriverClass = jdbcDriverClass;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
new file mode 100644
index 0000000..664202b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class PostgresSQLProvider extends GenericSQLProvider {
+
+   // BYTEA Size used in Journal
+   private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
+
+   private final String createFileTableSQL;
+
+   private final String createJournalTableSQL;
+
+   public PostgresSQLProvider(String tName) {
+      super(tName.toLowerCase());
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
+
+      createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)";
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getCreateJournalTableSQL() {
+      return createJournalTableSQL;
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
new file mode 100644
index 0000000..4d42d7f
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.postgresql.PGConnection;
+import org.postgresql.largeobject.LargeObject;
+import org.postgresql.largeobject.LargeObjectManager;
+
+public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
+
+   private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
+
+   public PostgresSequentialSequentialFileDriver() throws SQLException {
+      super();
+   }
+
+   @Override
+   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+
+         LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+         long oid = lobjManager.createLO();
+
+         createFile.setString(1, file.getFileName());
+         createFile.setString(2, file.getExtension());
+         createFile.setLong(3, oid);
+         createFile.executeUpdate();
+
+         try (ResultSet keys = createFile.getGeneratedKeys()) {
+            keys.next();
+            file.setId(keys.getInt(1));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   @Override
+   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            file.setWritePosition(getPostGresLargeObjectSize(file));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   @Override
+   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+      LargeObject largeObject = null;
+
+      Long oid = getOID(file);
+      try {
+         connection.setAutoCommit(false);
+         largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
+         largeObject.seek(largeObject.size());
+         largeObject.write(data);
+         largeObject.close();
+         connection.commit();
+      }
+      catch (Exception e) {
+         connection.rollback();
+         throw e;
+      }
+      return data.length;
+   }
+
+   @Override
+   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+      LargeObject largeObject = null;
+      long oid = getOID(file);
+      try {
+         connection.setAutoCommit(false);
+         largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+         int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
+
+         if (readLength > 0) {
+            if (file.position() > 0) largeObject.seek((int) file.position());
+            byte[] data = largeObject.read(readLength);
+            bytes.put(data);
+         }
+
+         largeObject.close();
+         connection.commit();
+
+         return readLength;
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
+      Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
+      if (oid == null) {
+         connection.setAutoCommit(false);
+         readLargeObject.setInt(1, file.getId());
+         try (ResultSet rs = readLargeObject.executeQuery()) {
+            if (rs.next()) {
+               file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+            }
+            connection.commit();
+         }
+         catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
+      }
+      if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
+         System.out.println("FD");
+      }
+      return (Long) file.getMetaData(POSTGRES_OID_KEY);
+   }
+
+   private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+
+      int size = 0;
+      Long oid = getOID(file);
+      if (oid != null) {
+         try {
+            connection.setAutoCommit(false);
+            LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+            size = largeObject.size();
+            largeObject.close();
+            connection.commit();
+         }
+         catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
+      }
+      return size;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
deleted file mode 100644
index 04af009..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file;
-
-import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
-import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-
-public class JDBCFileFactoryDriver {
-
-   protected Connection connection;
-
-   protected SQLProvider sqlProvider;
-
-   protected PreparedStatement deleteFile;
-
-   protected PreparedStatement createFile;
-
-   protected PreparedStatement selectFileByFileName;
-
-   protected PreparedStatement copyFileRecord;
-
-   protected PreparedStatement renameFile;
-
-   protected PreparedStatement readLargeObject;
-
-   protected PreparedStatement appendToLargeObject;
-
-   protected PreparedStatement selectFileNamesByExtension;
-
-   protected String connectionUrl;
-
-   protected String driverClass;
-
-   public JDBCFileFactoryDriver() {
-   }
-
-   public void setConnectionURL(String connectionUrl) {
-      this.connectionUrl = connectionUrl;
-   }
-
-   public void setSqlProvider(SQLProvider sqlProvider) {
-      this.sqlProvider = sqlProvider;
-   }
-
-   public void setDriverClass(String driverClass) {
-      this.driverClass = driverClass;
-   }
-
-   public void start() throws Exception {
-      Driver driver = JDBCUtils.getDriver(driverClass);
-      connection = driver.connect(connectionUrl, new Properties());
-      JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
-      prepareStatements();
-   }
-
-   public void stop() throws SQLException {
-      if (sqlProvider.closeConnectionOnShutdown())
-         connection.close();
-   }
-
-   protected void prepareStatements() throws SQLException {
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
-      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
-      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
-      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
-   }
-
-   public synchronized List<String> listFiles(String extension) throws Exception {
-      List<String> fileNames = new ArrayList<>();
-      try {
-         connection.setAutoCommit(false);
-         selectFileNamesByExtension.setString(1, extension);
-         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
-            while (rs.next()) {
-               fileNames.add(rs.getString(1));
-            }
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-      return fileNames;
-   }
-
-   /**
-    * Opens the supplied file.  If the file does not exist in the database it will create a new one.
-    *
-    * @param file
-    * @return
-    * @throws SQLException
-    */
-   public void openFile(JDBCSequentialFile file) throws SQLException {
-      int fileId = fileExists(file);
-      if (fileId < 0) {
-         createFile(file);
-      }
-      else {
-         file.setId(fileId);
-         loadFile(file);
-      }
-   }
-
-   /**
-    * Checks to see if a file with filename and extension exists.  If so returns the ID of the file or returns -1.
-    *
-    * @param file
-    * @return
-    * @throws SQLException
-    */
-   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      selectFileByFileName.setString(1, file.getFileName());
-      try (ResultSet rs = selectFileByFileName.executeQuery()) {
-         int id = rs.next() ? rs.getInt(1) : -1;
-         connection.commit();
-         return id;
-      }
-      catch (Exception e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Loads an existing file.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
-
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            file.setWritePosition((int) rs.getBlob(1).length());
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Creates a new database row representing the supplied file.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         createFile.setString(1, file.getFileName());
-         createFile.setString(2, file.getExtension());
-         createFile.setBytes(3, new byte[0]);
-         createFile.executeUpdate();
-         try (ResultSet keys = createFile.getGeneratedKeys()) {
-            keys.next();
-            file.setId(keys.getInt(1));
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Updates the fileName field to the new value.
-    *
-    * @param file
-    * @param newFileName
-    * @throws SQLException
-    */
-   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         renameFile.setString(1, newFileName);
-         renameFile.setInt(2, file.getId());
-         renameFile.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Deletes the associated row in the database.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         deleteFile.setInt(1, file.getId());
-         deleteFile.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Persists data to this files associated database mapping.
-    *
-    * @param file
-    * @param data
-    * @return
-    * @throws Exception
-    */
-   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         appendToLargeObject.setBytes(1, data);
-         appendToLargeObject.setInt(2, file.getId());
-         appendToLargeObject.executeUpdate();
-         connection.commit();
-         return data.length;
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Reads data from the file (at file.readPosition) into the byteBuffer.
-    *
-    * @param file
-    * @param bytes
-    * @return
-    * @throws Exception
-    */
-   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
-      int readLength = 0;
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            Blob blob = rs.getBlob(1);
-            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
-            byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
-            bytes.put(data);
-         }
-         connection.commit();
-         return readLength;
-      }
-      catch (Throwable e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Copy the data content of FileFrom to FileTo
-    *
-    * @param fileFrom
-    * @param fileTo
-    * @throws SQLException
-    */
-   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         copyFileRecord.setInt(1, fileFrom.getId());
-         copyFileRecord.setInt(2, fileTo.getId());
-         copyFileRecord.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Drop all tables and data
-    */
-   public synchronized void destroy() throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         Statement statement = connection.createStatement();
-         statement.executeUpdate(sqlProvider.getDropFileTableSQL());
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
-      long bytesRemaining = objectLength - readPosition;
-      if (bytesRemaining > bufferSpace) {
-         return bufferSpace;
-      }
-      else {
-         return bytesRemaining;
-      }
-   }
-
-   public int getMaxSize() {
-      return sqlProvider.getMaxBlobSize();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6b91223..5de8761 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -59,9 +59,7 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private final Object writeLock;
 
-   private final JDBCFileFactoryDriver dbDriver;
-
-   private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
+   private final JDBCSequentialFileFactoryDriver dbDriver;
 
    // Allows DB Drivers to cache meta data.
    private Map<Object, Object> metaData = new ConcurrentHashMap<>();
@@ -69,7 +67,7 @@ public class JDBCSequentialFile implements SequentialFile {
    public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
                              final String filename,
                              final Executor executor,
-                             final JDBCFileFactoryDriver driver,
+                             final JDBCSequentialFileFactoryDriver driver,
                              final Object writeLock) throws SQLException {
       this.fileFactory = fileFactory;
       this.filename = filename;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 07e30a9..3454757 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -42,7 +42,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
 
    private Map<String, Object> fileLocks = new HashMap<>();
 
-   private final JDBCFileFactoryDriver dbDriver;
+   private final JDBCSequentialFileFactoryDriver dbDriver;
 
    public JDBCSequentialFileFactory(final String connectionUrl,
                                     final String tableName,
@@ -184,5 +184,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
    }
 
    public synchronized void destroy() throws SQLException {
+      dbDriver.destroy();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
new file mode 100644
index 0000000..f8ad06b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+
+public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
+
+   protected PreparedStatement deleteFile;
+
+   protected PreparedStatement createFile;
+
+   protected PreparedStatement selectFileByFileName;
+
+   protected PreparedStatement copyFileRecord;
+
+   protected PreparedStatement renameFile;
+
+   protected PreparedStatement readLargeObject;
+
+   protected PreparedStatement appendToLargeObject;
+
+   protected PreparedStatement selectFileNamesByExtension;
+
+   public JDBCSequentialFileFactoryDriver() {
+      super();
+   }
+
+   public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
+      super(tableName, jdbcConnectionUrl, jdbcDriverClass);
+   }
+
+   public void start() throws Exception {
+      super.start();
+   }
+
+   @Override
+   protected void createSchema() throws SQLException {
+      createTable(sqlProvider.getCreateFileTableSQL());
+   }
+
+   @Override
+   protected void prepareStatements() throws SQLException {
+      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
+      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
+      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   }
+
+   public synchronized List<String> listFiles(String extension) throws Exception {
+      List<String> fileNames = new ArrayList<>();
+      try {
+         connection.setAutoCommit(false);
+         selectFileNamesByExtension.setString(1, extension);
+         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+            while (rs.next()) {
+               fileNames.add(rs.getString(1));
+            }
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+      return fileNames;
+   }
+
+   /**
+    * Opens the supplied file.  If the file does not exist in the database it will create a new one.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public void openFile(JDBCSequentialFile file) throws SQLException {
+      int fileId = fileExists(file);
+      if (fileId < 0) {
+         createFile(file);
+      }
+      else {
+         file.setId(fileId);
+         loadFile(file);
+      }
+   }
+
+   /**
+    * Checks to see if a file with filename and extension exists.  If so returns the ID of the file or returns -1.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      selectFileByFileName.setString(1, file.getFileName());
+      try (ResultSet rs = selectFileByFileName.executeQuery()) {
+         int id = rs.next() ? rs.getInt(1) : -1;
+         connection.commit();
+         return id;
+      }
+      catch (Exception e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Loads an existing file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            file.setWritePosition((int) rs.getBlob(1).length());
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Creates a new database row representing the supplied file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         createFile.setString(1, file.getFileName());
+         createFile.setString(2, file.getExtension());
+         createFile.setBytes(3, new byte[0]);
+         createFile.executeUpdate();
+         try (ResultSet keys = createFile.getGeneratedKeys()) {
+            keys.next();
+            file.setId(keys.getInt(1));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Updates the fileName field to the new value.
+    *
+    * @param file
+    * @param newFileName
+    * @throws SQLException
+    */
+   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         renameFile.setString(1, newFileName);
+         renameFile.setInt(2, file.getId());
+         renameFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Deletes the associated row in the database.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         deleteFile.setInt(1, file.getId());
+         deleteFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Persists data to this files associated database mapping.
+    *
+    * @param file
+    * @param data
+    * @return
+    * @throws Exception
+    */
+   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         appendToLargeObject.setBytes(1, data);
+         appendToLargeObject.setInt(2, file.getId());
+         appendToLargeObject.executeUpdate();
+         connection.commit();
+         return data.length;
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Reads data from the file (at file.readPosition) into the byteBuffer.
+    *
+    * @param file
+    * @param bytes
+    * @return
+    * @throws Exception
+    */
+   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+      int readLength = 0;
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            Blob blob = rs.getBlob(1);
+            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+            byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
+            bytes.put(data);
+         }
+         connection.commit();
+         return readLength;
+      }
+      catch (Throwable e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Copy the data content of FileFrom to FileTo
+    *
+    * @param fileFrom
+    * @param fileTo
+    * @throws SQLException
+    */
+   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         copyFileRecord.setInt(1, fileFrom.getId());
+         copyFileRecord.setInt(2, fileTo.getId());
+         copyFileRecord.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Drop all tables and data
+    */
+   public synchronized void destroy() throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         Statement statement = connection.createStatement();
+         statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
+      long bytesRemaining = objectLength - readPosition;
+      if (bytesRemaining > bufferSpace) {
+         return bufferSpace;
+      }
+      else {
+         return bytesRemaining;
+      }
+   }
+
+   public int getMaxSize() {
+      return sqlProvider.getMaxBlobSize();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 1e6f393..6c05112 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -95,6 +95,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
       createTable(sqlProvider.getCreateJournalTableSQL());
    }
 
+   @Override
    protected void prepareStatements() throws SQLException {
       insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
       selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
@@ -104,7 +105,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public synchronized void stop() throws Exception {
+   public synchronized void stop() throws SQLException {
       if (started) {
          synchronized (journalLock) {
             syncTimer.cancel();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba4d6a5..d8e4f11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,14 @@
          </dependency>
 
          <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>9.4-1205-jdbc4</version>
+            <scope>provided</scope>
+            <!-- postgresql license -->
+         </dependency>
+
+         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections-testframework</artifactId>
             <version>${commons.collections.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 90f5425..2d62ce0 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -240,10 +240,18 @@
          <artifactId>jboss-javaee</artifactId>
          <version>5.0.0.GA</version>
       </dependency>
+
+      <!-- DB Test Deps -->
       <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
-         <version>${apache.derby.version}</version>
+         <scope>test</scope>
+      </dependency>
+
+      <dependency>
+         <groupId>org.postgresql</groupId>
+         <artifactId>postgresql</artifactId>
+         <scope>test</scope>
       </dependency>
       <!--Vertx provided dependencies-->
       <dependency>


[5/5] activemq-artemis git commit: This closes #550

Posted by cl...@apache.org.
This closes #550


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e64ea527
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e64ea527
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e64ea527

Branch: refs/heads/master
Commit: e64ea5278f8eb0e4d2b706f20fb65179d65b6788
Parents: ae73001 634fc1b
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jun 1 17:30:51 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jun 1 17:30:51 2016 -0400

----------------------------------------------------------------------
 artemis-jdbc-store/pom.xml                      |   7 +
 .../activemq/artemis/jdbc/store/JDBCUtils.java  |  47 ++-
 .../jdbc/store/drivers/AbstractJDBCDriver.java  | 129 ++++++++
 .../store/drivers/derby/DerbySQLProvider.java   |  59 ++++
 .../store/drivers/mysql/MySQLSQLProvider.java   |  64 ++++
 .../drivers/postgres/PostgresSQLProvider.java   |  53 +++
 .../PostgresSequentialSequentialFileDriver.java | 169 ++++++++++
 .../jdbc/store/file/JDBCSequentialFile.java     | 170 ++++------
 .../store/file/JDBCSequentialFileFactory.java   |  98 ++----
 .../file/JDBCSequentialFileFactoryDriver.java   | 323 +++++++++++++++++++
 .../jdbc/store/file/sql/DerbySQLProvider.java   |  52 ---
 .../jdbc/store/file/sql/GenericSQLProvider.java | 143 --------
 .../jdbc/store/file/sql/SQLProvider.java        |  46 ---
 .../jdbc/store/journal/JDBCJournalImpl.java     | 131 +++-----
 .../jdbc/store/sql/GenericSQLProvider.java      | 201 ++++++++++++
 .../artemis/jdbc/store/sql/SQLProvider.java     |  60 ++++
 .../file/JDBCSequentialFileFactoryTest.java     |   9 +-
 .../impl/journal/JDBCJournalStorageManager.java |   6 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |  38 ++-
 .../artemis/tests/util/ThreadLeakCheckRule.java |   4 +
 pom.xml                                         |   8 +
 tests/integration-tests/pom.xml                 |  11 +-
 .../largemessage/LargeMessageTestBase.java      |   7 +
 .../RolesConfigurationStorageTest.java          |   1 -
 .../persistence/StorageManagerTestBase.java     |  12 +-
 .../integration/xa/BasicXaRecoveryTest.java     |   3 +
 tests/stress-tests/pom.xml                      |   6 +
 27 files changed, 1336 insertions(+), 521 deletions(-)
----------------------------------------------------------------------



[4/5] activemq-artemis git commit: Added MySQL Support

Posted by cl...@apache.org.
Added MySQL Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/634fc1b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/634fc1b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/634fc1b4

Branch: refs/heads/master
Commit: 634fc1b4827f139b089be660889dc93257d7b507
Parents: fda6789
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Jun 1 12:33:36 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Jun 1 19:04:25 2016 +0100

----------------------------------------------------------------------
 artemis-jdbc-store/pom.xml                      |  1 +
 .../activemq/artemis/jdbc/store/JDBCUtils.java  | 10 +++
 .../store/drivers/mysql/MySQLSQLProvider.java   | 64 ++++++++++++++++++++
 .../artemis/tests/util/ThreadLeakCheckRule.java |  4 ++
 pom.xml                                         |  2 +-
 tests/integration-tests/pom.xml                 |  1 +
 6 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index bb54c12..80f69c4 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -63,6 +63,7 @@
       <dependency>
          <groupId>org.postgresql</groupId>
          <artifactId>postgresql</artifactId>
+         <scope>provided</scope>
       </dependency>
 
       <dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index 04ac242..8ce08c6 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
@@ -75,6 +76,9 @@ public class JDBCUtils {
       else if (driverClass.contains("postgres")) {
          return new PostgresSQLProvider(tableName);
       }
+      else if (driverClass.contains("mysql")) {
+         return new MySQLSQLProvider(tableName);
+      }
       else {
          return new GenericSQLProvider(tableName);
       }
@@ -96,6 +100,12 @@ public class JDBCUtils {
          dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
          dbDriver.setJdbcDriverClass(driverClass);
       }
+      else if (driverClass.contains("mysql")) {
+         dbDriver = new JDBCSequentialFileFactoryDriver();
+         dbDriver.setSqlProvider(new MySQLSQLProvider(tableName));
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
+      }
       else {
          dbDriver = new JDBCSequentialFileFactoryDriver();
          dbDriver.setSqlProvider(new GenericSQLProvider(tableName));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java
new file mode 100644
index 0000000..1400382
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.mysql;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class MySQLSQLProvider extends GenericSQLProvider {
+
+   private static final int MAX_BLOB_SIZE = 4 * 1024 * 1024 * 1024; // 4GB
+
+   private final String createFileTableSQL;
+
+   private final String createJournalTableSQL;
+
+   private final String copyFileRecordByIdSQL;
+
+   public MySQLSQLProvider(String tName) {
+      super(tName.toLowerCase());
+
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID INTEGER NOT NULL AUTO_INCREMENT," +
+         "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA LONGBLOB, PRIMARY KEY(ID)) ENGINE=InnoDB;";
+
+      createJournalTableSQL = "CREATE TABLE " + tableName +
+         "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB;";
+
+      copyFileRecordByIdSQL = " UPDATE " + tableName + ", (SELECT DATA AS FROM_DATA FROM " + tableName +
+         " WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?;";
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getCreateJournalTableSQL() {
+      return createJournalTableSQL;
+   }
+
+   @Override
+   public String getCopyFileRecordByIdSQL() {
+      return copyFileRecordByIdSQL;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
index 3b53d53..fa881d5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
@@ -185,6 +185,10 @@ public class ThreadLeakCheckRule extends ExternalResource {
          // The derby engine is initialized once, and lasts the lifetime of the VM
          return true;
       }
+      else if (threadName.contains("Abandoned connection cleanup thread")) {
+         // MySQL Engine checks for abandoned connections
+         return true;
+      }
       else if (threadName.contains("Timer")) {
          // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
          return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d8e4f11..d0327fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -236,7 +236,7 @@
             <scope>provided</scope>
             <!-- postgresql license -->
          </dependency>
-
+         
          <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections-testframework</artifactId>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/634fc1b4/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 2d62ce0..b0303c7 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -253,6 +253,7 @@
          <artifactId>postgresql</artifactId>
          <scope>test</scope>
       </dependency>
+
       <!--Vertx provided dependencies-->
       <dependency>
          <groupId>io.vertx</groupId>