You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/10/24 07:27:37 UTC

nifi git commit: NIFI-5739: Maintain CaptureChangeMySQL JDBC connection automatically

Repository: nifi
Updated Branches:
  refs/heads/master ebead820f -> d28b1172d


NIFI-5739: Maintain CaptureChangeMySQL JDBC connection automatically

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #3103.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d28b1172
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d28b1172
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d28b1172

Branch: refs/heads/master
Commit: d28b1172db974cb1bd6aeba479b7655ce89c42db
Parents: ebead82
Author: Koji Kawamura <ij...@apache.org>
Authored: Tue Oct 23 15:38:24 2018 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Oct 24 09:26:41 2018 +0200

----------------------------------------------------------------------
 .../mysql/processors/CaptureChangeMySQL.java    | 77 +++++++++++++++-----
 .../processors/CaptureChangeMySQLTest.groovy    |  3 +-
 2 files changed, 61 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index f58ed7e..e8c94d1 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer();
     private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer();
 
-    private Connection jdbcConnection = null;
+    private JDBCConnectionHolder jdbcConnectionHolder = null;
 
     private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
     private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
@@ -710,9 +710,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         }
 
         if (createEnrichmentConnection) {
+            jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, null, connectTimeout);
             try {
-                jdbcConnection = getJdbcConnection(driverLocation, driverName, connectedHost, username, password, null);
-            } catch (InitializationException | SQLException e) {
+                // Ensure connection can be created.
+                getJdbcConnection();
+            } catch (SQLException e) {
                 binlogClient.disconnect();
                 binlogClient = null;
                 throw new IOException("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
@@ -945,6 +947,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
             throw new CDCException("Error closing CDC connection", e);
         } finally {
             binlogClient = null;
+
+            if (jdbcConnectionHolder != null) {
+                jdbcConnectionHolder.close();
+            }
         }
     }
 
@@ -998,8 +1004,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
      */
     protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
         TableInfo tableInfo = null;
-        if (jdbcConnection != null) {
-            try (Statement s = jdbcConnection.createStatement()) {
+        if (jdbcConnectionHolder != null) {
+
+            try (Statement s = getJdbcConnection().createStatement()) {
                 s.execute("USE `" + key.getDatabaseName() + "`");
                 ResultSet rs = s.executeQuery("SELECT * FROM `" + key.getTableName() + "` LIMIT 0");
                 ResultSetMetaData rsmd = rs.getMetaData();
@@ -1018,23 +1025,59 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
         return tableInfo;
     }
 
+    protected Connection getJdbcConnection() throws SQLException {
+        return jdbcConnectionHolder.getConnection();
+    }
+
+    private class JDBCConnectionHolder {
+        private String connectionUrl;
+        private Properties connectionProps = new Properties();
+        private long connectionTimeoutMillis;
+
+        private Connection connection;
+
+        private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) {
+            this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort();
+            if (customProperties != null) {
+                connectionProps.putAll(customProperties);
+            }
+            connectionProps.put("user", username);
+            connectionProps.put("password", password);
+            this.connectionTimeoutMillis = connectionTimeoutMillis;
+        }
+
+        private Connection getConnection() throws SQLException {
+            if (connection != null && connection.isValid((int) (connectionTimeoutMillis / 1000))) {
+                getLogger().trace("Returning the pooled JDBC connection.");
+                return connection;
+            }
+
+            // Close the existing connection just in case.
+            close();
+
+            getLogger().trace("Creating a new JDBC connection.");
+            connection = DriverManager.getConnection(connectionUrl, connectionProps);
+            return connection;
+        }
+
+        private void close() {
+            if (connection != null) {
+                try {
+                    getLogger().trace("Closing the pooled JDBC connection.");
+                    connection.close();
+                } catch (SQLException e) {
+                    getLogger().warn("Failed to close JDBC connection due to " + e, e);
+                }
+            }
+        }
+    }
+
+
     /**
      * using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
      *
      * @throws InitializationException if there is a problem obtaining the ClassLoader
      */
-    protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
-            throws InitializationException, SQLException {
-        Properties connectionProps = new Properties();
-        if (customProperties != null) {
-            connectionProps.putAll(customProperties);
-        }
-        connectionProps.put("user", username);
-        connectionProps.put("password", password);
-
-        return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps);
-    }
-
     protected void registerDriver(String locationString, String drvName) throws InitializationException {
         if (locationString != null && locationString.length() > 0) {
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 7e8607d..5b07850 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -967,8 +967,7 @@ class CaptureChangeMySQLTest {
         }
 
         @Override
-        protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
-                throws InitializationException, SQLException {
+        protected Connection getJdbcConnection() throws SQLException {
             Connection mockConnection = mock(Connection)
             Statement mockStatement = mock(Statement)
             when(mockConnection.createStatement()).thenReturn(mockStatement)