You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2019/05/21 16:25:27 UTC
[beam] branch master updated: [BEAM-7230] Make
PoolableDataSourceProvider a static singleton And simplifies all the inner
method construction that was error-prone.
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b73a96 [BEAM-7230] Make PoolableDataSourceProvider a static singleton And simplifies all the inner method construction that was error-prone.
new a37ba1a Merge pull request #8635 from iemejia/BEAM-7230-jdbc-fix-pool-instantiation
8b73a96 is described below
commit 8b73a962190907c561ab9c9895fb0911632daac5
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Tue May 21 15:17:47 2019 +0200
[BEAM-7230] Make PoolableDataSourceProvider a static singleton
And simplifies all the inner method construction that was error-prone.
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 122 ++++++++-------------
1 file changed, 47 insertions(+), 75 deletions(-)
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8bd4f7e..6d6b5e0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -382,46 +382,6 @@ public class JdbcIO {
}
}
- /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */
- public static class PoolableDataSourceProvider extends BaseDataSourceProvider {
- private static SerializableFunction<Void, DataSource> instance = null;
-
- private PoolableDataSourceProvider(
- SerializableFunction<Void, DataSource> dataSourceProviderFn) {
- super(dataSourceProviderFn);
- }
-
- public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
- if (instance == null) {
- instance =
- MemoizedDataSourceProvider.of(
- new PoolableDataSourceProvider(
- DataSourceProviderFromDataSourceConfiguration.of(config)));
- }
- return instance;
- }
-
- @Override
- public DataSource apply(Void input) {
- DataSource current = super.dataSourceProviderFn.apply(input);
- // wrapping the datasource as a pooling datasource
- DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current);
- PoolableConnectionFactory poolableConnectionFactory =
- new PoolableConnectionFactory(connectionFactory, null);
- GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
- poolConfig.setMaxTotal(1);
- poolConfig.setMinIdle(0);
- poolConfig.setMinEvictableIdleTimeMillis(10000);
- poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
- GenericObjectPool connectionPool =
- new GenericObjectPool(poolableConnectionFactory, poolConfig);
- poolableConnectionFactory.setPool(connectionPool);
- poolableConnectionFactory.setDefaultAutoCommit(false);
- poolableConnectionFactory.setDefaultReadOnly(false);
- return new PoolingDataSource(connectionPool);
- }
- }
-
/**
* An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
* used to setParameters into the database.
@@ -1112,39 +1072,50 @@ public class JdbcIO {
}
}
- private static class DataSourceProviderFromDataSourceConfiguration
+ /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */
+ public static class PoolableDataSourceProvider
implements SerializableFunction<Void, DataSource>, HasDisplayData {
- private final DataSourceConfiguration config;
- private static DataSourceProviderFromDataSourceConfiguration instance;
+ private static PoolableDataSourceProvider instance;
+ private static transient DataSource source;
+ private static SerializableFunction<Void, DataSource> dataSourceProviderFn;
- private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) {
- this.config = config;
+ private PoolableDataSourceProvider(DataSourceConfiguration config) {
+ dataSourceProviderFn = DataSourceProviderFromDataSourceConfiguration.of(config);
}
- public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
+ public static synchronized SerializableFunction<Void, DataSource> of(
+ DataSourceConfiguration config) {
if (instance == null) {
- instance = new DataSourceProviderFromDataSourceConfiguration(config);
+ instance = new PoolableDataSourceProvider(config);
}
return instance;
}
@Override
public DataSource apply(Void input) {
- return config.buildDatasource();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- config.populateDisplayData(builder);
- }
- }
-
- private abstract static class BaseDataSourceProvider
- implements SerializableFunction<Void, DataSource>, HasDisplayData {
- private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
-
- BaseDataSourceProvider(SerializableFunction<Void, DataSource> dataSourceProviderFn) {
- this.dataSourceProviderFn = dataSourceProviderFn;
+ return buildDataSource(input);
+ }
+
+ static synchronized DataSource buildDataSource(Void input) {
+ if (source == null) {
+ DataSource basicSource = dataSourceProviderFn.apply(input);
+ DataSourceConnectionFactory connectionFactory =
+ new DataSourceConnectionFactory(basicSource);
+ PoolableConnectionFactory poolableConnectionFactory =
+ new PoolableConnectionFactory(connectionFactory, null);
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxTotal(1);
+ poolConfig.setMinIdle(0);
+ poolConfig.setMinEvictableIdleTimeMillis(10000);
+ poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+ GenericObjectPool connectionPool =
+ new GenericObjectPool(poolableConnectionFactory, poolConfig);
+ poolableConnectionFactory.setPool(connectionPool);
+ poolableConnectionFactory.setDefaultAutoCommit(false);
+ poolableConnectionFactory.setDefaultReadOnly(false);
+ source = new PoolingDataSource(connectionPool);
+ }
+ return source;
}
@Override
@@ -1155,29 +1126,30 @@ public class JdbcIO {
}
}
- private static class MemoizedDataSourceProvider extends BaseDataSourceProvider {
- private static MemoizedDataSourceProvider instance = null;
- @Nullable private static DataSource datasource = null;
+ private static class DataSourceProviderFromDataSourceConfiguration
+ implements SerializableFunction<Void, DataSource>, HasDisplayData {
+ private final DataSourceConfiguration config;
+ private static DataSourceProviderFromDataSourceConfiguration instance;
- private MemoizedDataSourceProvider(
- SerializableFunction<Void, DataSource> dataSourceProviderFn) {
- super(dataSourceProviderFn);
+ private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) {
+ this.config = config;
}
- public static MemoizedDataSourceProvider of(
- SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+ public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
if (instance == null) {
- instance = new MemoizedDataSourceProvider(dataSourceProviderFn);
+ instance = new DataSourceProviderFromDataSourceConfiguration(config);
}
return instance;
}
@Override
public DataSource apply(Void input) {
- if (datasource == null) {
- datasource = super.dataSourceProviderFn.apply(null);
- }
- return datasource;
+ return config.buildDatasource();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ config.populateDisplayData(builder);
}
}
}