You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/10/24 07:27:37 UTC
nifi git commit: NIFI-5739: Maintain CaptureChangeMySQL JDBC
connection automatically
Repository: nifi
Updated Branches:
refs/heads/master ebead820f -> d28b1172d
NIFI-5739: Maintain CaptureChangeMySQL JDBC connection automatically
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3103.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d28b1172
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d28b1172
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d28b1172
Branch: refs/heads/master
Commit: d28b1172db974cb1bd6aeba479b7655ce89c42db
Parents: ebead82
Author: Koji Kawamura <ij...@apache.org>
Authored: Tue Oct 23 15:38:24 2018 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Oct 24 09:26:41 2018 +0200
----------------------------------------------------------------------
.../mysql/processors/CaptureChangeMySQL.java | 77 +++++++++++++++-----
.../processors/CaptureChangeMySQLTest.groovy | 3 +-
2 files changed, 61 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index f58ed7e..e8c94d1 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer();
private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer();
- private Connection jdbcConnection = null;
+ private JDBCConnectionHolder jdbcConnectionHolder = null;
private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
@@ -710,9 +710,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
if (createEnrichmentConnection) {
+ jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, null, connectTimeout);
try {
- jdbcConnection = getJdbcConnection(driverLocation, driverName, connectedHost, username, password, null);
- } catch (InitializationException | SQLException e) {
+ // Ensure connection can be created.
+ getJdbcConnection();
+ } catch (SQLException e) {
binlogClient.disconnect();
binlogClient = null;
throw new IOException("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
@@ -945,6 +947,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new CDCException("Error closing CDC connection", e);
} finally {
binlogClient = null;
+
+ if (jdbcConnectionHolder != null) {
+ jdbcConnectionHolder.close();
+ }
}
}
@@ -998,8 +1004,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
*/
protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
TableInfo tableInfo = null;
- if (jdbcConnection != null) {
- try (Statement s = jdbcConnection.createStatement()) {
+ if (jdbcConnectionHolder != null) {
+
+ try (Statement s = getJdbcConnection().createStatement()) {
s.execute("USE `" + key.getDatabaseName() + "`");
ResultSet rs = s.executeQuery("SELECT * FROM `" + key.getTableName() + "` LIMIT 0");
ResultSetMetaData rsmd = rs.getMetaData();
@@ -1018,23 +1025,59 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return tableInfo;
}
+ protected Connection getJdbcConnection() throws SQLException {
+ return jdbcConnectionHolder.getConnection();
+ }
+
+ private class JDBCConnectionHolder {
+ private String connectionUrl;
+ private Properties connectionProps = new Properties();
+ private long connectionTimeoutMillis;
+
+ private Connection connection;
+
+ private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) {
+ this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort();
+ if (customProperties != null) {
+ connectionProps.putAll(customProperties);
+ }
+ connectionProps.put("user", username);
+ connectionProps.put("password", password);
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ }
+
+ private Connection getConnection() throws SQLException {
+ if (connection != null && connection.isValid((int) (connectionTimeoutMillis / 1000))) {
+ getLogger().trace("Returning the pooled JDBC connection.");
+ return connection;
+ }
+
+ // Close the existing connection just in case.
+ close();
+
+ getLogger().trace("Creating a new JDBC connection.");
+ connection = DriverManager.getConnection(connectionUrl, connectionProps);
+ return connection;
+ }
+
+ private void close() {
+ if (connection != null) {
+ try {
+ getLogger().trace("Closing the pooled JDBC connection.");
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().warn("Failed to close JDBC connection due to " + e, e);
+ }
+ }
+ }
+ }
+
+
/**
* using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
*
* @throws InitializationException if there is a problem obtaining the ClassLoader
*/
- protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
- throws InitializationException, SQLException {
- Properties connectionProps = new Properties();
- if (customProperties != null) {
- connectionProps.putAll(customProperties);
- }
- connectionProps.put("user", username);
- connectionProps.put("password", password);
-
- return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps);
- }
-
protected void registerDriver(String locationString, String drvName) throws InitializationException {
if (locationString != null && locationString.length() > 0) {
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 7e8607d..5b07850 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -967,8 +967,7 @@ class CaptureChangeMySQLTest {
}
@Override
- protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
- throws InitializationException, SQLException {
+ protected Connection getJdbcConnection() throws SQLException {
Connection mockConnection = mock(Connection)
Statement mockStatement = mock(Statement)
when(mockConnection.createStatement()).thenReturn(mockStatement)