You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:14 UTC
[19/50] [abbrv] beam git commit: [BEAM-1922] Close datasource in
JdbcIO when possible
[BEAM-1922] Close datasource in JdbcIO when possible
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc846268
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc846268
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc846268
Branch: refs/heads/jstorm-runner
Commit: dc84626877a0e7183ed660df167a1d02d1589f90
Parents: 946778c
Author: mingmxu <mi...@ebay.com>
Authored: Mon Apr 10 11:19:02 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Apr 17 18:07:16 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 40 +++++++++++---------
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 10 ++---
2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 05a30a4..b26a47d 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
-
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -55,15 +54,13 @@ import org.apache.commons.dbcp2.BasicDataSource;
* <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the
* type returned by the provided {@link RowMapper}.
*
- * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using
- * {@link DataSourceConfiguration#create(DataSource)} or
- * {@link DataSourceConfiguration#create(String, String)} with either a
- * {@link DataSource} (which must be {@link Serializable}) or the parameters needed to create it
- * (driver class name and url). Optionally, {@link DataSourceConfiguration#withUsername(String)} and
- * {@link DataSourceConfiguration#withPassword(String)} allows you to define DataSource username
- * and password.
- * For example:
+ * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using<br>
+ * 1. {@link DataSourceConfiguration#create(DataSource)}(which must be {@link Serializable});<br>
+ * 2. or {@link DataSourceConfiguration#create(String, String)}(driver class name and url).
+ * Optionally, {@link DataSourceConfiguration#withUsername(String)} and
+ * {@link DataSourceConfiguration#withPassword(String)} allows you to define username and password.
*
+ * <p>For example:
* <pre>{@code
* pipeline.apply(JdbcIO.<KV<Integer, String>>read()
* .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
@@ -245,11 +242,9 @@ public class JdbcIO {
}
}
- Connection getConnection() throws Exception {
+ DataSource buildDatasource() throws Exception{
if (getDataSource() != null) {
- return (getUsername() != null)
- ? getDataSource().getConnection(getUsername(), getPassword())
- : getDataSource().getConnection();
+ return getDataSource();
} else {
BasicDataSource basicDataSource = new BasicDataSource();
basicDataSource.setDriverClassName(getDriverClassName());
@@ -259,9 +254,10 @@ public class JdbcIO {
if (getConnectionProperties() != null) {
basicDataSource.setConnectionProperties(getConnectionProperties());
}
- return basicDataSource.getConnection();
+ return basicDataSource;
}
}
+
}
/**
@@ -368,6 +364,7 @@ public class JdbcIO {
/** A {@link DoFn} executing the SQL query to read from the database. */
static class ReadFn<T> extends DoFn<String, T> {
private JdbcIO.Read<T> spec;
+ private DataSource dataSource;
private Connection connection;
private ReadFn(Read<T> spec) {
@@ -376,7 +373,8 @@ public class JdbcIO {
@Setup
public void setup() throws Exception {
- connection = spec.getDataSourceConfiguration().getConnection();
+ dataSource = spec.getDataSourceConfiguration().buildDatasource();
+ connection = dataSource.getConnection();
}
@ProcessElement
@@ -396,8 +394,9 @@ public class JdbcIO {
@Teardown
public void teardown() throws Exception {
- if (connection != null) {
- connection.close();
+ connection.close();
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
}
}
}
@@ -462,6 +461,7 @@ public class JdbcIO {
private final Write<T> spec;
+ private DataSource dataSource;
private Connection connection;
private PreparedStatement preparedStatement;
private int batchCount;
@@ -472,7 +472,8 @@ public class JdbcIO {
@Setup
public void setup() throws Exception {
- connection = spec.getDataSourceConfiguration().getConnection();
+ dataSource = spec.getDataSourceConfiguration().buildDatasource();
+ connection = dataSource.getConnection();
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(spec.getStatement());
}
@@ -516,6 +517,9 @@ public class JdbcIO {
if (connection != null) {
connection.close();
}
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 4e82338..984ce1a 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -126,7 +126,7 @@ public class JdbcIOTest implements Serializable {
@Test
public void testDataSourceConfigurationDataSource() throws Exception {
JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource);
- try (Connection conn = config.getConnection()) {
+ try (Connection conn = config.buildDatasource().getConnection()) {
assertTrue(conn.isValid(0));
}
}
@@ -136,7 +136,7 @@ public class JdbcIOTest implements Serializable {
JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
"org.apache.derby.jdbc.ClientDriver",
"jdbc:derby://localhost:" + port + "/target/beam");
- try (Connection conn = config.getConnection()) {
+ try (Connection conn = config.buildDatasource().getConnection()) {
assertTrue(conn.isValid(0));
}
}
@@ -148,7 +148,7 @@ public class JdbcIOTest implements Serializable {
"jdbc:derby://localhost:" + port + "/target/beam")
.withUsername("sa")
.withPassword("sa");
- try (Connection conn = config.getConnection()) {
+ try (Connection conn = config.buildDatasource().getConnection()) {
assertTrue(conn.isValid(0));
}
}
@@ -160,7 +160,7 @@ public class JdbcIOTest implements Serializable {
"jdbc:derby://localhost:" + port + "/target/beam")
.withUsername("sa")
.withPassword(null);
- try (Connection conn = config.getConnection()) {
+ try (Connection conn = config.buildDatasource().getConnection()) {
assertTrue(conn.isValid(0));
}
}
@@ -172,7 +172,7 @@ public class JdbcIOTest implements Serializable {
"jdbc:derby://localhost:" + port + "/target/beam")
.withUsername(null)
.withPassword(null);
- try (Connection conn = config.getConnection()) {
+ try (Connection conn = config.buildDatasource().getConnection()) {
assertTrue(conn.isValid(0));
}
}