You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ma...@apache.org on 2009/09/04 20:50:39 UTC
svn commit: r811514 - in
/jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core:
data/db/ persistence/bundle/util/ util/db/
Author: martijnh
Date: Fri Sep 4 18:50:38 2009
New Revision: 811514
URL: http://svn.apache.org/viewvc?rev=811514&view=rev
Log:
JCR-1456 Database connection pooling
* Refactored the Datastore to use the new db utility classes that use connection pooling.
Removed:
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DatabaseHelper.java
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/persistence/bundle/util/ConnectionRecoveryManager.java
Modified:
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DerbyDataStore.java
jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java
Modified: jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java?rev=811514&r1=811513&r2=811514&view=diff
==============================================================================
--- jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java (original)
+++ jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java Fri Sep 4 18:50:38 2009
@@ -20,9 +20,12 @@
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager.StreamWrapper;
+import org.apache.jackrabbit.core.util.db.CheckSchemaOperation;
+import org.apache.jackrabbit.core.util.db.ConnectionFactory;
+import org.apache.jackrabbit.core.util.db.ConnectionHelper;
+import org.apache.jackrabbit.core.util.db.DbUtility;
+import org.apache.jackrabbit.core.util.db.StreamWrapper;
import org.apache.jackrabbit.util.Text;
import org.apache.jackrabbit.uuid.UUID;
import org.slf4j.Logger;
@@ -37,8 +40,6 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,7 +49,7 @@
import java.util.Properties;
import java.util.WeakHashMap;
-import javax.jcr.RepositoryException;
+import javax.sql.DataSource;
/**
* A data store implementation that stores the records in a database using JDBC.
@@ -100,11 +101,6 @@
public static final int DEFAULT_MIN_RECORD_LENGTH = 100;
/**
- * The default value for the maximum connections.
- */
- public static final int DEFAULT_MAX_CONNECTIONS = 3;
-
- /**
* Write to a temporary file to get the length (slow, but always works).
* This is the default setting.
*/
@@ -172,16 +168,6 @@
protected int minRecordLength = DEFAULT_MIN_RECORD_LENGTH;
/**
- * The maximum number of open connections.
- */
- protected int maxConnections = DEFAULT_MAX_CONNECTIONS;
-
- /**
- * A list of connections
- */
- protected Pool connectionPool;
-
- /**
* The prefix for the datastore table, empty by default.
*/
protected String tablePrefix = "";
@@ -291,34 +277,37 @@
protected List<String> temporaryInUse = Collections.synchronizedList(new ArrayList<String>());
/**
+ * The {@link ConnectionHelper} set in the {@link #init(String)} method.
+ * */
+ protected ConnectionHelper conHelper;
+
+ /**
* {@inheritDoc}
*/
public DataRecord addRecord(InputStream stream) throws DataStoreException {
ResultSet rs = null;
TempFileInputStream fileInput = null;
- ConnectionRecoveryManager conn = getConnection();
String id = null, tempId = null;
try {
long now;
- for (int i = 0; i < ConnectionRecoveryManager.TRIALS; i++) {
+ while(true) {
try {
now = System.currentTimeMillis();
id = UUID.randomUUID().toString();
tempId = TEMP_PREFIX + id;
// SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID=?
- PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{tempId});
- rs = prep.getResultSet();
+ rs = conHelper.exec(selectMetaSQL, new Object[]{tempId}, false, 0);
if (rs.next()) {
// re-try in the very, very unlikely event that the row already exists
continue;
}
// INSERT INTO DATASTORE VALUES(?, 0, ?, NULL)
- conn.executeStmt(insertTempSQL, new Object[]{tempId, new Long(now)});
+ conHelper.exec(insertTempSQL, new Object[]{tempId, new Long(now)});
break;
} catch (Exception e) {
throw convert("Can not insert new record", e);
} finally {
- DatabaseHelper.closeSilently(rs);
+ DbUtility.close(rs);
}
}
if (id == null) {
@@ -344,7 +333,7 @@
throw new DataStoreException("Unsupported stream store algorithm: " + storeStream);
}
// UPDATE DATASTORE SET DATA=? WHERE ID=?
- conn.executeStmt(updateDataSQL, new Object[]{wrapper, tempId});
+ conHelper.exec(updateDataSQL, new Object[]{wrapper, tempId});
now = System.currentTimeMillis();
long length = in.getPosition();
DataIdentifier identifier = new DataIdentifier(digest.digest());
@@ -353,17 +342,15 @@
// UPDATE DATASTORE SET ID=?, LENGTH=?, LAST_MODIFIED=?
// WHERE ID=?
// AND NOT EXISTS(SELECT ID FROM DATASTORE WHERE ID=?)
- PreparedStatement prep = conn.executeStmt(updateSQL, new Object[]{
+ int count = conHelper.update(updateSQL, new Object[]{
id, new Long(length), new Long(now),
tempId, id});
- int count = prep.getUpdateCount();
if (count == 0) {
// update count is 0, meaning such a row already exists
// DELETE FROM DATASTORE WHERE ID=?
- conn.executeStmt(deleteSQL, new Object[]{tempId});
+ conHelper.exec(deleteSQL, new Object[]{tempId});
// SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID=?
- prep = conn.executeStmt(selectMetaSQL, new Object[]{id});
- rs = prep.getResultSet();
+ rs = conHelper.exec(selectMetaSQL, new Object[]{id}, false, 0);
if (rs.next()) {
long oldLength = rs.getLong(1);
long lastModified = rs.getLong(2);
@@ -387,8 +374,7 @@
if (tempId != null) {
temporaryInUse.remove(tempId);
}
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
if (fileInput != null) {
try {
fileInput.close();
@@ -417,7 +403,6 @@
* {@inheritDoc}
*/
public synchronized int deleteAllOlderThan(long min) throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
try {
ArrayList<String> touch = new ArrayList<String>();
ArrayList<DataIdentifier> ids = new ArrayList<DataIdentifier>(inUse.keySet());
@@ -431,12 +416,9 @@
updateLastModifiedDate(key, 0);
}
// DELETE FROM DATASTORE WHERE LAST_MODIFIED<?
- PreparedStatement prep = conn.executeStmt(deleteOlderSQL, new Long[]{new Long(min)});
- return prep.getUpdateCount();
+ return conHelper.update(deleteOlderSQL, new Long[]{new Long(min)});
} catch (Exception e) {
throw convert("Can not delete records", e);
- } finally {
- putBack(conn);
}
}
@@ -444,13 +426,11 @@
* {@inheritDoc}
*/
public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
ArrayList<DataIdentifier> list = new ArrayList<DataIdentifier>();
ResultSet rs = null;
try {
// SELECT ID FROM DATASTORE
- PreparedStatement prep = conn.executeStmt(selectAllSQL, new Object[0]);
- rs = prep.getResultSet();
+ rs = conHelper.exec(selectAllSQL, new Object[0], false, 0);
while (rs.next()) {
String id = rs.getString(1);
if (!id.startsWith(TEMP_PREFIX)) {
@@ -462,8 +442,7 @@
} catch (Exception e) {
throw convert("Can not read records", e);
} finally {
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
}
}
@@ -488,14 +467,12 @@
* {@inheritDoc}
*/
public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
- ConnectionRecoveryManager conn = getConnection();
usesIdentifier(identifier);
ResultSet rs = null;
try {
String id = identifier.toString();
// SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID = ?
- PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{id});
- rs = prep.getResultSet();
+ rs = conHelper.exec(selectMetaSQL, new Object[]{id}, false, 0);
if (!rs.next()) {
throw new DataStoreException("Record not found: " + identifier);
}
@@ -506,8 +483,7 @@
} catch (Exception e) {
throw convert("Can not read identifier " + identifier, e);
} finally {
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
}
}
@@ -532,36 +508,29 @@
* or if the given identifier is invalid
*/
InputStream openStream(DbInputStream inputStream, DataIdentifier identifier) throws DataStoreException {
- ConnectionRecoveryManager conn = null;
ResultSet rs = null;
try {
- conn = getConnection();
// SELECT ID, DATA FROM DATASTORE WHERE ID = ?
- PreparedStatement prep = conn.executeStmt(selectDataSQL, new Object[]{identifier.toString()});
- rs = prep.getResultSet();
+ rs = conHelper.exec(selectDataSQL, new Object[]{identifier.toString()}, false, 0);
if (!rs.next()) {
throw new DataStoreException("Record not found: " + identifier);
}
InputStream stream = rs.getBinaryStream(2);
if (stream == null) {
stream = new ByteArrayInputStream(new byte[0]);
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
} else if (copyWhenReading) {
// If we copy while reading, create a temp file and close the stream
File temp = moveToTempFile(stream);
stream = new TempFileInputStream(temp);
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
} else {
stream = new BufferedInputStream(stream);
- inputStream.setConnection(conn);
inputStream.setResultSet(rs);
}
return stream;
} catch (Exception e) {
- DatabaseHelper.closeSilently(rs);
- putBack(conn);
+ DbUtility.close(rs);
throw convert("Retrieving database resource ", e);
}
}
@@ -572,26 +541,42 @@
public synchronized void init(String homeDir) throws DataStoreException {
try {
initDatabaseType();
- connectionPool = new Pool(this, maxConnections);
- ConnectionRecoveryManager conn = getConnection();
- DatabaseMetaData meta = conn.getConnection().getMetaData();
- log.info("Using JDBC driver " + meta.getDriverName() + " " + meta.getDriverVersion());
- meta.getDriverVersion();
- ResultSet rs = meta.getTables(null, null, schemaObjectPrefix + tableSQL, null);
- boolean exists = rs.next();
- rs.close();
- if (!exists) {
- // CREATE TABLE DATASTORE(ID VARCHAR(255) PRIMARY KEY,
- // LENGTH BIGINT, LAST_MODIFIED BIGINT, DATA BLOB)
- conn.executeStmt(createTableSQL, null);
- }
- putBack(conn);
+
+ conHelper = createConnectionHelper(ConnectionFactory.getDataSource(getDriver(), getUrl(), getUser(),
+ getPassword()));
+
+ createCheckSchemaOperation().run();
+
} catch (Exception e) {
throw convert("Can not init data store, driver=" + driver + " url=" + url + " user=" + user +
" schemaObjectPrefix=" + schemaObjectPrefix + " tableSQL=" + tableSQL + " createTableSQL=" + createTableSQL, e);
}
}
+ /**
+ * This method is called from the {@link #init(String)} method of this class and returns a
+ * {@link ConnectionHelper} instance which is assigned to the {@code conHelper} field. Subclasses may
+ * override it to return a specialized connection helper.
+ *
+ * @param dataSrc the {@link DataSource} of this persistence manager
+ * @return a {@link ConnectionHelper}
+ * @throws Exception on error
+ */
+ protected ConnectionHelper createConnectionHelper(DataSource dataSrc) throws Exception {
+ return new ConnectionHelper(dataSrc);
+ }
+
+ /**
+ * This method is called from {@link #init(String)} after the
+ * {@link #createConnectionHelper(DataSource)} method, and returns a default {@link CheckSchemaOperation}.
+ *
+ * @return a new {@link CheckSchemaOperation} instance
+ */
+ protected final CheckSchemaOperation createCheckSchemaOperation() {
+ String tableName = tablePrefix + schemaObjectPrefix + tableSQL;
+ return new CheckSchemaOperation(conHelper, new ByteArrayInputStream(createTableSQL.getBytes()), tableName);
+ }
+
protected void initDatabaseType() throws DataStoreException {
boolean failIfNotFound;
if (databaseType == null) {
@@ -716,17 +701,14 @@
if (lastModified < minModifiedDate) {
long now = System.currentTimeMillis();
Long n = new Long(now);
- ConnectionRecoveryManager conn = getConnection();
try {
// UPDATE DATASTORE SET LAST_MODIFIED = ? WHERE ID = ? AND LAST_MODIFIED < ?
- conn.executeStmt(updateLastModifiedSQL, new Object[]{
+ conHelper.exec(updateLastModifiedSQL, new Object[]{
n, key, n
});
return now;
} catch (Exception e) {
throw convert("Can not update lastModified", e);
- } finally {
- putBack(conn);
}
}
return lastModified;
@@ -829,11 +811,6 @@
* {@inheritDoc}
*/
public synchronized void close() throws DataStoreException {
- ArrayList<ConnectionRecoveryManager> list = connectionPool.getAll();
- for (ConnectionRecoveryManager conn : list) {
- conn.close();
- }
- list.clear();
}
protected void usesIdentifier(DataIdentifier identifier) {
@@ -855,53 +832,24 @@
}
}
- protected ConnectionRecoveryManager getConnection() throws DataStoreException {
- try {
- ConnectionRecoveryManager conn = (ConnectionRecoveryManager) connectionPool.get();
- conn.setAutoReconnect(true);
- return conn;
- } catch (InterruptedException e) {
- throw new DataStoreException("Interrupted", e);
- } catch (RepositoryException e) {
- throw new DataStoreException("Can not open a new connection", e);
- }
- }
-
- protected void putBack(ConnectionRecoveryManager conn) throws DataStoreException {
- try {
- connectionPool.add(conn);
- } catch (InterruptedException e) {
- throw new DataStoreException("Interrupted", e);
- }
- }
-
/**
* Get the maximum number of concurrent connections.
*
+ * @deprecated
* @return the maximum number of connections.
*/
public int getMaxConnections() {
- return maxConnections;
+ return -1;
}
/**
* Set the maximum number of concurrent connections in the pool.
* At least 3 connections are required if the garbage collection process is used.
*
+ *@deprecated
* @param maxConnections the new value
*/
public void setMaxConnections(int maxConnections) {
- this.maxConnections = maxConnections;
- }
-
- /**
- * Create a new connection.
- *
- * @return the new connection
- */
- public ConnectionRecoveryManager createNewConnection() throws RepositoryException {
- ConnectionRecoveryManager conn = new ConnectionRecoveryManager(false, driver, url, user, password);
- return conn;
}
/**
Modified: jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java?rev=811514&r1=811513&r2=811514&view=diff
==============================================================================
--- jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java (original)
+++ jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbInputStream.java Fri Sep 4 18:50:38 2009
@@ -23,7 +23,7 @@
import org.apache.commons.io.input.AutoCloseInputStream;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
+import org.apache.jackrabbit.core.util.db.DbUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +39,7 @@
protected DbDataStore store;
protected DataIdentifier identifier;
protected boolean endOfStream;
-
- protected ConnectionRecoveryManager conn;
+
protected ResultSet rs;
@@ -130,16 +129,9 @@
// some additional database objects
// may need to be closed
if (rs != null) {
- DatabaseHelper.closeSilently(rs);
+ DbUtility.close(rs);
rs = null;
}
- if (conn != null) {
- try {
- store.putBack(conn);
- } catch (DataStoreException e) {
- log.info("Error closing DbResource", e);
- }
- }
}
}
@@ -208,16 +200,6 @@
}
/**
- * Set the database connection of this input stream. This object must be
- * closed once the stream is closed.
- *
- * @param conn the connection
- */
- void setConnection(ConnectionRecoveryManager conn) {
- this.conn = conn;
- }
-
- /**
* Set the result set of this input stream. This object must be closed once
* the stream is closed.
*
@@ -226,5 +208,4 @@
void setResultSet(ResultSet rs) {
this.rs = rs;
}
-
}
Modified: jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DerbyDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DerbyDataStore.java?rev=811514&r1=811513&r2=811514&view=diff
==============================================================================
--- jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DerbyDataStore.java (original)
+++ jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DerbyDataStore.java Fri Sep 4 18:50:38 2009
@@ -16,14 +16,13 @@
*/
package org.apache.jackrabbit.core.data.db;
-import java.sql.DriverManager;
import java.sql.SQLException;
+import javax.sql.DataSource;
+
import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.persistence.bundle.DerbyPersistenceManager;
-import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.jackrabbit.core.util.db.ConnectionHelper;
+import org.apache.jackrabbit.core.util.db.DerbyConnectionHelper;
/**
* The Derby data store closes the database when the data store is closed
@@ -32,49 +31,22 @@
public class DerbyDataStore extends DbDataStore {
/**
- * Logger instance
+ * {@inheritDoc}
*/
- private static Logger log = LoggerFactory.getLogger(DerbyDataStore.class);
+ @Override
+ protected ConnectionHelper createConnectionHelper(DataSource dataSrc) throws Exception {
+ return new DerbyConnectionHelper(dataSrc);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public synchronized void close() throws DataStoreException {
super.close();
-
- // check for embedded driver
- if (!DerbyPersistenceManager.DERBY_EMBEDDED_DRIVER.equals(getDriver())) {
- return;
- }
-
try {
-
- // prepare connection url for issuing shutdown command
- ConnectionRecoveryManager connectionManager = getConnection();
-
- String url = connectionManager.getConnection().getMetaData().getURL();
- int pos = url.lastIndexOf(';');
- if (pos != -1) {
- // strip any attributes from connection url
- url = url.substring(0, pos);
- }
- url += ";shutdown=true";
-
- // we have to reset the connection to 'autoCommit=true' before closing it;
- // otherwise Derby would mysteriously complain about some pending uncommitted
- // changes which can't possibly be true.
- // @todo further investigate
- connectionManager.getConnection().setAutoCommit(true);
-
- // need to call it again because we just opened a connection,
- // and super.close() closes it.
- super.close();
-
- // now it's safe to shutdown the embedded Derby database
- try {
- DriverManager.getConnection(url);
- } catch (SQLException e) {
- // a shutdown command always raises a SQLException
- log.info(e.getMessage());
- }
- } catch (Exception e) {
+ ((DerbyConnectionHelper) conHelper).shutDown(getDriver());
+ } catch (SQLException e) {
throw new DataStoreException(e);
}
}
Modified: jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java?rev=811514&r1=811513&r2=811514&view=diff
==============================================================================
--- jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java (original)
+++ jackrabbit/sandbox/JCR-1456/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/util/db/StreamWrapper.java Fri Sep 4 18:50:38 2009
@@ -25,8 +25,9 @@
/**
* Creates a wrapper for the given InputStream that can
- * safely be passed as a parameter to the <code>executeStmt</code>
- * methods in the {@link ConnectionRecoveryManager} class.
+ * safely be passed as a parameter to the {@link ConnectionHelper#exec(String, Object...)},
+ * {@link ConnectionHelper#exec(String, Object[], boolean, int)} and
+ * {@link ConnectionHelper#update(String, Object[])} methods.
*
* @param in the InputStream to wrap
* @param size the size of the input stream