You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2019/05/21 16:25:27 UTC

[beam] branch master updated: [BEAM-7230] Make PoolableDataSourceProvider a static singleton And simplifies all the inner method construction that was error-prone.

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b73a96  [BEAM-7230] Make PoolableDataSourceProvider a static singleton And simplifies all the inner method construction that was error-prone.
     new a37ba1a  Merge pull request #8635 from iemejia/BEAM-7230-jdbc-fix-pool-instantiation
8b73a96 is described below

commit 8b73a962190907c561ab9c9895fb0911632daac5
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Tue May 21 15:17:47 2019 +0200

    [BEAM-7230] Make PoolableDataSourceProvider a static singleton
    And simplifies all the inner method construction that was error-prone.
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 122 ++++++++-------------
 1 file changed, 47 insertions(+), 75 deletions(-)

diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8bd4f7e..6d6b5e0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -382,46 +382,6 @@ public class JdbcIO {
     }
   }
 
-  /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */
-  public static class PoolableDataSourceProvider extends BaseDataSourceProvider {
-    private static SerializableFunction<Void, DataSource> instance = null;
-
-    private PoolableDataSourceProvider(
-        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
-      super(dataSourceProviderFn);
-    }
-
-    public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
-      if (instance == null) {
-        instance =
-            MemoizedDataSourceProvider.of(
-                new PoolableDataSourceProvider(
-                    DataSourceProviderFromDataSourceConfiguration.of(config)));
-      }
-      return instance;
-    }
-
-    @Override
-    public DataSource apply(Void input) {
-      DataSource current = super.dataSourceProviderFn.apply(input);
-      // wrapping the datasource as a pooling datasource
-      DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current);
-      PoolableConnectionFactory poolableConnectionFactory =
-          new PoolableConnectionFactory(connectionFactory, null);
-      GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
-      poolConfig.setMaxTotal(1);
-      poolConfig.setMinIdle(0);
-      poolConfig.setMinEvictableIdleTimeMillis(10000);
-      poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
-      GenericObjectPool connectionPool =
-          new GenericObjectPool(poolableConnectionFactory, poolConfig);
-      poolableConnectionFactory.setPool(connectionPool);
-      poolableConnectionFactory.setDefaultAutoCommit(false);
-      poolableConnectionFactory.setDefaultReadOnly(false);
-      return new PoolingDataSource(connectionPool);
-    }
-  }
-
   /**
    * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
    * used to setParameters into the database.
@@ -1112,39 +1072,50 @@ public class JdbcIO {
     }
   }
 
-  private static class DataSourceProviderFromDataSourceConfiguration
+  /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */
+  public static class PoolableDataSourceProvider
       implements SerializableFunction<Void, DataSource>, HasDisplayData {
-    private final DataSourceConfiguration config;
-    private static DataSourceProviderFromDataSourceConfiguration instance;
+    private static PoolableDataSourceProvider instance;
+    private static transient DataSource source;
+    private static SerializableFunction<Void, DataSource> dataSourceProviderFn;
 
-    private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) {
-      this.config = config;
+    private PoolableDataSourceProvider(DataSourceConfiguration config) {
+      dataSourceProviderFn = DataSourceProviderFromDataSourceConfiguration.of(config);
     }
 
-    public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
+    public static synchronized SerializableFunction<Void, DataSource> of(
+        DataSourceConfiguration config) {
       if (instance == null) {
-        instance = new DataSourceProviderFromDataSourceConfiguration(config);
+        instance = new PoolableDataSourceProvider(config);
       }
       return instance;
     }
 
     @Override
     public DataSource apply(Void input) {
-      return config.buildDatasource();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      config.populateDisplayData(builder);
-    }
-  }
-
-  private abstract static class BaseDataSourceProvider
-      implements SerializableFunction<Void, DataSource>, HasDisplayData {
-    private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
-
-    BaseDataSourceProvider(SerializableFunction<Void, DataSource> dataSourceProviderFn) {
-      this.dataSourceProviderFn = dataSourceProviderFn;
+      return buildDataSource(input);
+    }
+
+    static synchronized DataSource buildDataSource(Void input) {
+      if (source == null) {
+        DataSource basicSource = dataSourceProviderFn.apply(input);
+        DataSourceConnectionFactory connectionFactory =
+            new DataSourceConnectionFactory(basicSource);
+        PoolableConnectionFactory poolableConnectionFactory =
+            new PoolableConnectionFactory(connectionFactory, null);
+        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+        poolConfig.setMaxTotal(1);
+        poolConfig.setMinIdle(0);
+        poolConfig.setMinEvictableIdleTimeMillis(10000);
+        poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+        GenericObjectPool connectionPool =
+            new GenericObjectPool(poolableConnectionFactory, poolConfig);
+        poolableConnectionFactory.setPool(connectionPool);
+        poolableConnectionFactory.setDefaultAutoCommit(false);
+        poolableConnectionFactory.setDefaultReadOnly(false);
+        source = new PoolingDataSource(connectionPool);
+      }
+      return source;
     }
 
     @Override
@@ -1155,29 +1126,30 @@ public class JdbcIO {
     }
   }
 
-  private static class MemoizedDataSourceProvider extends BaseDataSourceProvider {
-    private static MemoizedDataSourceProvider instance = null;
-    @Nullable private static DataSource datasource = null;
+  private static class DataSourceProviderFromDataSourceConfiguration
+      implements SerializableFunction<Void, DataSource>, HasDisplayData {
+    private final DataSourceConfiguration config;
+    private static DataSourceProviderFromDataSourceConfiguration instance;
 
-    private MemoizedDataSourceProvider(
-        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
-      super(dataSourceProviderFn);
+    private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) {
+      this.config = config;
     }
 
-    public static MemoizedDataSourceProvider of(
-        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+    public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
       if (instance == null) {
-        instance = new MemoizedDataSourceProvider(dataSourceProviderFn);
+        instance = new DataSourceProviderFromDataSourceConfiguration(config);
       }
       return instance;
     }
 
     @Override
     public DataSource apply(Void input) {
-      if (datasource == null) {
-        datasource = super.dataSourceProviderFn.apply(null);
-      }
-      return datasource;
+      return config.buildDatasource();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      config.populateDisplayData(builder);
     }
   }
 }