You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/17 16:08:03 UTC

[1/2] beam git commit: [BEAM-1922] Close datasource in JdbcIO when possible

Repository: beam
Updated Branches:
  refs/heads/master 946778c5b -> 588a4d00e


[BEAM-1922] Close datasource in JdbcIO when possible


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc846268
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc846268
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc846268

Branch: refs/heads/master
Commit: dc84626877a0e7183ed660df167a1d02d1589f90
Parents: 946778c
Author: mingmxu <mi...@ebay.com>
Authored: Mon Apr 10 11:19:02 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Apr 17 18:07:16 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 40 +++++++++++---------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 10 ++---
 2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 05a30a4..b26a47d 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
-
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -55,15 +54,13 @@ import org.apache.commons.dbcp2.BasicDataSource;
  * <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the
  * type returned by the provided {@link RowMapper}.
  *
- * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using
- * {@link DataSourceConfiguration#create(DataSource)} or
- * {@link DataSourceConfiguration#create(String, String)} with either a
- * {@link DataSource} (which must be {@link Serializable}) or the parameters needed to create it
- * (driver class name and url). Optionally, {@link DataSourceConfiguration#withUsername(String)} and
- * {@link DataSourceConfiguration#withPassword(String)} allows you to define DataSource username
- * and password.
- * For example:
+ * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using<br>
+ * 1. {@link DataSourceConfiguration#create(DataSource)}(which must be {@link Serializable});<br>
+ * 2. or {@link DataSourceConfiguration#create(String, String)}(driver class name and url).
+ * Optionally, {@link DataSourceConfiguration#withUsername(String)} and
+ * {@link DataSourceConfiguration#withPassword(String)} allows you to define username and password.
  *
+ * <p>For example:
  * <pre>{@code
  * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
  *   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
@@ -245,11 +242,9 @@ public class JdbcIO {
       }
     }
 
-    Connection getConnection() throws Exception {
+    DataSource buildDatasource() throws Exception{
       if (getDataSource() != null) {
-        return (getUsername() != null)
-            ? getDataSource().getConnection(getUsername(), getPassword())
-            : getDataSource().getConnection();
+        return getDataSource();
       } else {
         BasicDataSource basicDataSource = new BasicDataSource();
         basicDataSource.setDriverClassName(getDriverClassName());
@@ -259,9 +254,10 @@ public class JdbcIO {
         if (getConnectionProperties() != null) {
           basicDataSource.setConnectionProperties(getConnectionProperties());
         }
-        return basicDataSource.getConnection();
+        return basicDataSource;
       }
     }
+
   }
 
   /**
@@ -368,6 +364,7 @@ public class JdbcIO {
     /** A {@link DoFn} executing the SQL query to read from the database. */
     static class ReadFn<T> extends DoFn<String, T> {
       private JdbcIO.Read<T> spec;
+      private DataSource dataSource;
       private Connection connection;
 
       private ReadFn(Read<T> spec) {
@@ -376,7 +373,8 @@ public class JdbcIO {
 
       @Setup
       public void setup() throws Exception {
-        connection = spec.getDataSourceConfiguration().getConnection();
+        dataSource = spec.getDataSourceConfiguration().buildDatasource();
+        connection = dataSource.getConnection();
       }
 
       @ProcessElement
@@ -396,8 +394,9 @@ public class JdbcIO {
 
       @Teardown
       public void teardown() throws Exception {
-        if (connection != null) {
-          connection.close();
+        connection.close();
+        if (dataSource instanceof AutoCloseable) {
+          ((AutoCloseable) dataSource).close();
         }
       }
     }
@@ -462,6 +461,7 @@ public class JdbcIO {
 
       private final Write<T> spec;
 
+      private DataSource dataSource;
       private Connection connection;
       private PreparedStatement preparedStatement;
       private int batchCount;
@@ -472,7 +472,8 @@ public class JdbcIO {
 
       @Setup
       public void setup() throws Exception {
-        connection = spec.getDataSourceConfiguration().getConnection();
+        dataSource = spec.getDataSourceConfiguration().buildDatasource();
+        connection = dataSource.getConnection();
         connection.setAutoCommit(false);
         preparedStatement = connection.prepareStatement(spec.getStatement());
       }
@@ -516,6 +517,9 @@ public class JdbcIO {
           if (connection != null) {
             connection.close();
           }
+          if (dataSource instanceof AutoCloseable) {
+            ((AutoCloseable) dataSource).close();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dc846268/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 4e82338..984ce1a 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -126,7 +126,7 @@ public class JdbcIOTest implements Serializable {
   @Test
   public void testDataSourceConfigurationDataSource() throws Exception {
     JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource);
-    try (Connection conn = config.getConnection()) {
+    try (Connection conn = config.buildDatasource().getConnection()) {
       assertTrue(conn.isValid(0));
     }
   }
@@ -136,7 +136,7 @@ public class JdbcIOTest implements Serializable {
     JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
         "org.apache.derby.jdbc.ClientDriver",
         "jdbc:derby://localhost:" + port + "/target/beam");
-    try (Connection conn = config.getConnection()) {
+    try (Connection conn = config.buildDatasource().getConnection()) {
       assertTrue(conn.isValid(0));
     }
   }
@@ -148,7 +148,7 @@ public class JdbcIOTest implements Serializable {
         "jdbc:derby://localhost:" + port + "/target/beam")
         .withUsername("sa")
         .withPassword("sa");
-    try (Connection conn = config.getConnection()) {
+    try (Connection conn = config.buildDatasource().getConnection()) {
       assertTrue(conn.isValid(0));
     }
   }
@@ -160,7 +160,7 @@ public class JdbcIOTest implements Serializable {
         "jdbc:derby://localhost:" + port + "/target/beam")
         .withUsername("sa")
         .withPassword(null);
-    try (Connection conn = config.getConnection()) {
+    try (Connection conn = config.buildDatasource().getConnection()) {
       assertTrue(conn.isValid(0));
     }
   }
@@ -172,7 +172,7 @@ public class JdbcIOTest implements Serializable {
         "jdbc:derby://localhost:" + port + "/target/beam")
         .withUsername(null)
         .withPassword(null);
-    try (Connection conn = config.getConnection()) {
+    try (Connection conn = config.buildDatasource().getConnection()) {
       assertTrue(conn.isValid(0));
     }
   }


[2/2] beam git commit: [BEAM-1922] This closes #2482

Posted by jb...@apache.org.
[BEAM-1922] This closes #2482


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588a4d00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588a4d00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588a4d00

Branch: refs/heads/master
Commit: 588a4d00e5394cb636a6a7d7c266775ad167909a
Parents: 946778c dc84626
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Apr 17 18:07:56 2017 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Apr 17 18:07:56 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 40 +++++++++++---------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 10 ++---
 2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------