You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by si...@apache.org on 2011/09/26 22:59:29 UTC
svn commit: r1176052 - in /commons/proper/dbutils/trunk/src:
main/java/org/apache/commons/dbutils/AsyncQueryRunner.java
test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java
Author: simonetripodi
Date: Mon Sep 26 20:59:29 2011
New Revision: 1176052
URL: http://svn.apache.org/viewvc?rev=1176052&view=rev
Log:
[DBUTILS-78] Add asynchronous batch, query, and update calls
Modified:
commons/proper/dbutils/trunk/src/main/java/org/apache/commons/dbutils/AsyncQueryRunner.java
commons/proper/dbutils/trunk/src/test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java
Modified: commons/proper/dbutils/trunk/src/main/java/org/apache/commons/dbutils/AsyncQueryRunner.java
URL: http://svn.apache.org/viewvc/commons/proper/dbutils/trunk/src/main/java/org/apache/commons/dbutils/AsyncQueryRunner.java?rev=1176052&r1=1176051&r2=1176052&view=diff
==============================================================================
--- commons/proper/dbutils/trunk/src/main/java/org/apache/commons/dbutils/AsyncQueryRunner.java (original)
+++ commons/proper/dbutils/trunk/src/main/java/org/apache/commons/dbutils/AsyncQueryRunner.java Mon Sep 26 20:59:29 2011
@@ -21,6 +21,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import javax.sql.DataSource;
@@ -33,11 +35,13 @@ import javax.sql.DataSource;
*/
public class AsyncQueryRunner extends AbstractQueryRunner {
+ private final ExecutorService executorService;
+
/**
* Constructor for AsyncQueryRunner.
*/
- public AsyncQueryRunner() {
- super(null, false);
+ public AsyncQueryRunner(ExecutorService executorService) {
+ this(null, false, executorService);
}
/**
@@ -46,8 +50,8 @@ public class AsyncQueryRunner extends Ab
* if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
* and if it breaks, we'll remember not to use it again.
*/
- public AsyncQueryRunner(boolean pmdKnownBroken) {
- super(null, pmdKnownBroken);
+ public AsyncQueryRunner(boolean pmdKnownBroken, ExecutorService executorService) {
+ this(null, pmdKnownBroken, executorService);
}
/**
@@ -57,8 +61,8 @@ public class AsyncQueryRunner extends Ab
*
* @param ds The <code>DataSource</code> to retrieve connections from.
*/
- public AsyncQueryRunner(DataSource ds) {
- super(ds, false);
+ public AsyncQueryRunner(DataSource ds, ExecutorService executorService) {
+ this(ds, false, executorService);
}
/**
@@ -71,8 +75,9 @@ public class AsyncQueryRunner extends Ab
* if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
* and if it breaks, we'll remember not to use it again.
*/
- public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken) {
+ public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken, ExecutorService executorService) {
super(ds, pmdKnownBroken);
+ this.executorService = executorService;
}
/**
@@ -136,11 +141,11 @@ public class AsyncQueryRunner extends Ab
* @param sql The SQL to execute.
* @param params An array of query replacement parameters. Each row in
* this array is one set of batch replacement values.
- * @return A <code>Callable</code> which returns the number of rows updated per statement.
+ * @return A <code>Future</code> which returns the number of rows updated per statement.
* @throws SQLException if a database access error occurs
*/
- public Callable<int[]> batch(Connection conn, String sql, Object[][] params) throws SQLException {
- return this.batch(conn, false, sql, params);
+ public Future<int[]> batch(Connection conn, String sql, Object[][] params) throws SQLException {
+ return executorService.submit(this.batch(conn, false, sql, params));
}
/**
@@ -152,13 +157,13 @@ public class AsyncQueryRunner extends Ab
* @param sql The SQL to execute.
* @param params An array of query replacement parameters. Each row in
* this array is one set of batch replacement values.
- * @return A <code>Callable</code> which returns the number of rows updated per statement.
+ * @return A <code>Future</code> which returns the number of rows updated per statement.
* @throws SQLException if a database access error occurs
*/
- public Callable<int[]> batch(String sql, Object[][] params) throws SQLException {
+ public Future<int[]> batch(String sql, Object[][] params) throws SQLException {
Connection conn = this.prepareConnection();
- return this.batch(conn, true, sql, params);
+ return executorService.submit(this.batch(conn, true, sql, params));
}
/**
@@ -234,7 +239,7 @@ public class AsyncQueryRunner extends Ab
* @param params An array of query replacement parameters. Each row in
* this array is one set of batch replacement values.
*/
- public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps,
+ public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps,
ResultSetHandler<T> rsh, String sql, Object... params) {
this.sql = sql;
this.params = params;
@@ -334,11 +339,11 @@ public class AsyncQueryRunner extends Ab
* @param sql The query to execute.
* @param rsh The handler that converts the results into an object.
* @param params The replacement parameters.
- * @return A <code>Callable</code> which returns the result of the query call.
+ * @return A <code>Future</code> which returns the result of the query call.
* @throws SQLException if a database access error occurs
*/
- public <T> Callable<T> query(Connection conn, String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
- return query(conn, false, sql, rsh, params);
+ public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
+ return executorService.submit(query(conn, false, sql, rsh, params));
}
/**
@@ -348,11 +353,11 @@ public class AsyncQueryRunner extends Ab
* @param conn The connection to execute the query in.
* @param sql The query to execute.
* @param rsh The handler that converts the results into an object.
- * @return A <code>Callable</code> which returns the result of the query call.
+ * @return A <code>Future</code> which returns the result of the query call.
* @throws SQLException if a database access error occurs
*/
- public <T> Callable<T> query(Connection conn, String sql, ResultSetHandler<T> rsh) throws SQLException {
- return this.query(conn, false, sql, rsh, (Object[]) null);
+ public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh) throws SQLException {
+ return executorService.submit(this.query(conn, false, sql, rsh, (Object[]) null));
}
/**
@@ -365,12 +370,12 @@ public class AsyncQueryRunner extends Ab
* the <code>ResultSet</code>.
* @param params Initialize the PreparedStatement's IN parameters with
* this array.
- * @return A <code>Callable</code> which returns the result of the query call.
+ * @return A <code>Future</code> which returns the result of the query call.
* @throws SQLException if a database access error occurs
*/
- public <T> Callable<T> query(String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
+ public <T> Future<T> query(String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
Connection conn = this.prepareConnection();
- return this.query(conn, true, sql, rsh, params);
+ return executorService.submit(this.query(conn, true, sql, rsh, params));
}
/**
@@ -382,12 +387,12 @@ public class AsyncQueryRunner extends Ab
* @param rsh The handler used to create the result object from
* the <code>ResultSet</code>.
*
- * @return A <code>Callable</code> which returns the result of the query call.
+ * @return A <code>Future</code> which returns the result of the query call.
* @throws SQLException if a database access error occurs
*/
- public <T> Callable<T> query(String sql, ResultSetHandler<T> rsh) throws SQLException {
+ public <T> Future<T> query(String sql, ResultSetHandler<T> rsh) throws SQLException {
Connection conn = this.prepareConnection();
- return this.query(conn, true, sql, rsh, (Object[]) null);
+ return executorService.submit(this.query(conn, true, sql, rsh, (Object[]) null));
}
/**
@@ -493,11 +498,11 @@ public class AsyncQueryRunner extends Ab
*
* @param conn The connection to use to run the query.
* @param sql The SQL to execute.
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
* @throws SQLException if a database access error occurs
*/
- public Callable<Integer> update(Connection conn, String sql) throws SQLException {
- return this.update(conn, false, sql, (Object[]) null);
+ public Future<Integer> update(Connection conn, String sql) throws SQLException {
+ return executorService.submit(this.update(conn, false, sql, (Object[]) null));
}
/**
@@ -507,11 +512,11 @@ public class AsyncQueryRunner extends Ab
* @param conn The connection to use to run the query.
* @param sql The SQL to execute.
* @param param The replacement parameter.
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
* @throws SQLException if a database access error occurs
*/
- public Callable<Integer> update(Connection conn, String sql, Object param) throws SQLException {
- return this.update(conn, false, sql, new Object[] { param });
+ public Future<Integer> update(Connection conn, String sql, Object param) throws SQLException {
+ return executorService.submit(this.update(conn, false, sql, new Object[] { param }));
}
/**
@@ -520,11 +525,11 @@ public class AsyncQueryRunner extends Ab
* @param conn The connection to use to run the query.
* @param sql The SQL to execute.
* @param params The query replacement parameters.
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
* @throws SQLException if a database access error occurs
*/
- public Callable<Integer> update(Connection conn, String sql, Object... params) throws SQLException {
- return this.update(conn, false, sql, params);
+ public Future<Integer> update(Connection conn, String sql, Object... params) throws SQLException {
+ return executorService.submit(this.update(conn, false, sql, params));
}
/**
@@ -536,11 +541,11 @@ public class AsyncQueryRunner extends Ab
*
* @param sql The SQL statement to execute.
* @throws SQLException if a database access error occurs
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
*/
- public Callable<Integer> update(String sql) throws SQLException {
+ public Future<Integer> update(String sql) throws SQLException {
Connection conn = this.prepareConnection();
- return this.update(conn, true, sql, (Object[]) null);
+ return executorService.submit(this.update(conn, true, sql, (Object[]) null));
}
/**
@@ -553,11 +558,11 @@ public class AsyncQueryRunner extends Ab
* @param sql The SQL statement to execute.
* @param param The replacement parameter.
* @throws SQLException if a database access error occurs
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
*/
- public Callable<Integer> update(String sql, Object param) throws SQLException {
+ public Future<Integer> update(String sql, Object param) throws SQLException {
Connection conn = this.prepareConnection();
- return this.update(conn, true, sql, new Object[] { param });
+ return executorService.submit(this.update(conn, true, sql, new Object[] { param }));
}
/**
@@ -570,11 +575,11 @@ public class AsyncQueryRunner extends Ab
* @param params Initializes the PreparedStatement's IN (i.e. '?')
* parameters.
* @throws SQLException if a database access error occurs
- * @return A <code>Callable</code> which returns the number of rows updated.
+ * @return A <code>Future</code> which returns the number of rows updated.
*/
- public Callable<Integer> update(String sql, Object... params) throws SQLException {
+ public Future<Integer> update(String sql, Object... params) throws SQLException {
Connection conn = this.prepareConnection();
- return this.update(conn, true, sql, params);
+ return executorService.submit(this.update(conn, true, sql, params));
}
}
Modified: commons/proper/dbutils/trunk/src/test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java
URL: http://svn.apache.org/viewvc/commons/proper/dbutils/trunk/src/test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java?rev=1176052&r1=1176051&r2=1176052&view=diff
==============================================================================
--- commons/proper/dbutils/trunk/src/test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java (original)
+++ commons/proper/dbutils/trunk/src/test/java/org/apache/commons/dbutils/AsyncQueryRunnerTest.java Mon Sep 26 20:59:29 2011
@@ -30,6 +30,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import javax.sql.DataSource;
@@ -62,7 +64,7 @@ public class AsyncQueryRunnerTest {
when(results.next()).thenReturn(false);
handler = new ArrayHandler();
- runner = new AsyncQueryRunner(dataSource);
+ runner = new AsyncQueryRunner(dataSource, Executors.newFixedThreadPool(1));
}
//
@@ -70,11 +72,11 @@ public class AsyncQueryRunnerTest {
//
private void callGoodBatch(Connection conn, Object[][] params) throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<int[]> callable = runner.batch(conn, "select * from blah where ? = ?", params);
+ Future<int[]> callable = runner.batch(conn, "select * from blah where ? = ?", params);
verify(stmt, times(2)).addBatch();
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeBatch();
verify(stmt, times(1)).close(); // make sure we closed the statement
verify(conn, times(0)).close(); // make sure we closed the connection
@@ -82,11 +84,10 @@ public class AsyncQueryRunnerTest {
private void callGoodBatch(Object[][] params) throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<int[]> callable = runner.batch("select * from blah where ? = ?", params);
+ Future<int[]> callable = runner.batch("select * from blah where ? = ?", params);
verify(stmt, times(2)).addBatch();
- verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeBatch();
verify(stmt, times(1)).close(); // make sure we closed the statement
verify(conn, times(1)).close(); // make sure we closed the connection
@@ -101,7 +102,7 @@ public class AsyncQueryRunnerTest {
@Test
public void testGoodBatchPmdTrue() throws Exception {
- runner = new AsyncQueryRunner(dataSource, true);
+ runner = new AsyncQueryRunner(dataSource, true, Executors.newFixedThreadPool(1));
String[][] params = new String[][] { { "unit", "unit" }, { "test", "test" } };
callGoodBatch(params);
@@ -109,7 +110,7 @@ public class AsyncQueryRunnerTest {
@Test
public void testGoodBatchDefaultConstructor() throws Exception {
- runner = new AsyncQueryRunner();
+ runner = new AsyncQueryRunner(Executors.newFixedThreadPool(1));
String[][] params = new String[][] { { "unit", "unit" }, { "test", "test" } };
callGoodBatch(conn, params);
@@ -126,7 +127,7 @@ public class AsyncQueryRunnerTest {
// helper method for calling batch when an exception is expected
private void callBatchWithException(String sql, Object[][] params) throws Exception {
- Callable<int[]> callable = null;
+ Future<int[]> callable = null;
boolean caught = false;
try {
@@ -134,7 +135,7 @@ public class AsyncQueryRunnerTest {
verify(stmt, times(2)).addBatch();
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeBatch();
verify(stmt, times(1)).close(); // make sure the statement is closed
verify(conn, times(1)).close(); // make sure the connection is closed
@@ -210,10 +211,10 @@ public class AsyncQueryRunnerTest {
//
private void callGoodQuery(Connection conn) throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<Object[]> callable = runner.query(conn, "select * from blah where ? = ?", handler, "unit", "test");
+ Future<Object[]> callable = runner.query(conn, "select * from blah where ? = ?", handler, "unit", "test");
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeQuery();
verify(results, times(1)).close();
verify(stmt, times(1)).close(); // make sure we closed the statement
@@ -224,7 +225,7 @@ public class AsyncQueryRunnerTest {
callable = runner.query(conn, "select * from blah", handler);
verify(stmt, times(1)).close(); // make sure the statement has only been closed once
- callable.call();
+ callable.get();
verify(stmt, times(2)).executeQuery();
verify(results, times(2)).close();
verify(stmt, times(2)).close(); // make sure we closed the statement
@@ -233,10 +234,10 @@ public class AsyncQueryRunnerTest {
private void callGoodQuery() throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<Object[]> callable = runner.query("select * from blah where ? = ?", handler, "unit", "test");
+ Future<Object[]> callable = runner.query("select * from blah where ? = ?", handler, "unit", "test");
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeQuery();
verify(results, times(1)).close();
verify(stmt, times(1)).close(); // make sure we closed the statement
@@ -247,7 +248,7 @@ public class AsyncQueryRunnerTest {
callable = runner.query("select * from blah", handler);
verify(stmt, times(1)).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(2)).executeQuery();
verify(results, times(2)).close();
verify(stmt, times(2)).close(); // make sure we closed the statement
@@ -261,20 +262,20 @@ public class AsyncQueryRunnerTest {
@Test
public void testGoodQueryPmdTrue() throws Exception {
- runner = new AsyncQueryRunner(true);
+ runner = new AsyncQueryRunner(true, Executors.newFixedThreadPool(1));
callGoodQuery(conn);
}
@Test
public void testGoodQueryDefaultConstructor() throws Exception {
- runner = new AsyncQueryRunner();
+ runner = new AsyncQueryRunner(Executors.newFixedThreadPool(1));
callGoodQuery(conn);
}
// helper method for calling batch when an exception is expected
private void callQueryWithException(Object... params) throws Exception {
- Callable<Object[]> callable = null;
+ Future<Object[]> callable = null;
boolean caught = false;
try {
@@ -282,7 +283,7 @@ public class AsyncQueryRunnerTest {
callable = runner.query("select * from blah where ? = ?", handler, params);
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeQuery();
verify(results, times(1)).close();
verify(stmt, times(1)).close(); // make sure we closed the statement
@@ -345,10 +346,10 @@ public class AsyncQueryRunnerTest {
//
private void callGoodUpdate(Connection conn) throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<Integer> callable = runner.update(conn, "update blah set ? = ?", "unit", "test");
+ Future<Integer> callable = runner.update(conn, "update blah set ? = ?", "unit", "test");
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeUpdate();
verify(stmt, times(1)).close(); // make sure we closed the statement
verify(conn, times(0)).close(); // make sure we closed the connection
@@ -356,9 +357,8 @@ public class AsyncQueryRunnerTest {
// call the other variation
when(meta.getParameterCount()).thenReturn(0);
callable = runner.update(conn, "update blah set unit = test");
- verify(stmt, times(1)).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(2)).executeUpdate();
verify(stmt, times(2)).close(); // make sure we closed the statement
verify(conn, times(0)).close(); // make sure we closed the connection
@@ -366,9 +366,8 @@ public class AsyncQueryRunnerTest {
// call the other variation
when(meta.getParameterCount()).thenReturn(1);
callable = runner.update(conn, "update blah set unit = ?", "test");
- verify(stmt, times(2)).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(3)).executeUpdate();
verify(stmt, times(3)).close(); // make sure we closed the statement
verify(conn, times(0)).close(); // make sure we closed the connection
@@ -376,10 +375,10 @@ public class AsyncQueryRunnerTest {
private void callGoodUpdate() throws Exception {
when(meta.getParameterCount()).thenReturn(2);
- Callable<Integer> callable = runner.update("update blah set ? = ?", "unit", "test");
+ Future<Integer> callable = runner.update("update blah set ? = ?", "unit", "test");
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeUpdate();
verify(stmt, times(1)).close(); // make sure we closed the statement
verify(conn, times(1)).close(); // make sure we closed the connection
@@ -387,9 +386,8 @@ public class AsyncQueryRunnerTest {
// call the other variation
when(meta.getParameterCount()).thenReturn(0);
callable = runner.update("update blah set unit = test");
- verify(stmt, times(1)).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(2)).executeUpdate();
verify(stmt, times(2)).close(); // make sure we closed the statement
verify(conn, times(2)).close(); // make sure we closed the connection
@@ -397,9 +395,8 @@ public class AsyncQueryRunnerTest {
// call the other variation
when(meta.getParameterCount()).thenReturn(1);
callable = runner.update("update blah set unit = ?", "test");
- verify(stmt, times(2)).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(3)).executeUpdate();
verify(stmt, times(3)).close(); // make sure we closed the statement
verify(conn, times(3)).close(); // make sure we closed the connection
@@ -412,19 +409,19 @@ public class AsyncQueryRunnerTest {
@Test
public void testGoodUpdatePmdTrue() throws Exception {
- runner = new AsyncQueryRunner(true);
+ runner = new AsyncQueryRunner(true, Executors.newFixedThreadPool(1));
callGoodUpdate(conn);
}
@Test
public void testGoodUpdateDefaultConstructor() throws Exception {
- runner = new AsyncQueryRunner();
+ runner = new AsyncQueryRunner(Executors.newFixedThreadPool(1));
callGoodUpdate(conn);
}
// helper method for calling batch when an exception is expected
private void callUpdateWithException(Object... params) throws Exception {
- Callable<Integer> callable = null;
+ Future<Integer> callable = null;
boolean caught = false;
try {
@@ -432,7 +429,7 @@ public class AsyncQueryRunnerTest {
callable = runner.update("select * from blah where ? = ?", params);
verify(stmt, never()).close(); // make sure the statement is still open
- callable.call();
+ callable.get();
verify(stmt, times(1)).executeUpdate();
verify(stmt, times(1)).close(); // make sure we closed the statement
verify(conn, times(1)).close(); // make sure we closed the connection
@@ -513,7 +510,7 @@ public class AsyncQueryRunnerTest {
@Test(expected=SQLException.class)
public void testBadPrepareConnection() throws Exception {
- runner = new AsyncQueryRunner();
+ runner = new AsyncQueryRunner(Executors.newFixedThreadPool(1));
runner.update("update blah set unit = test");
}
}