You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/27 01:33:13 UTC

[1/3] nifi git commit: NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL

Repository: nifi
Updated Branches:
  refs/heads/master 24bb8cf95 -> 097548da9


NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL

This closes #1702.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da0454d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da0454d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da0454d8

Branch: refs/heads/master
Commit: da0454d80f5c0e5b0268dbd279a80d21d62e7c85
Parents: 24bb8cf
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 15:19:12 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:35:34 2017 +0900

----------------------------------------------------------------------
 .../mysql/processors/CaptureChangeMySQL.java    |  4 +++
 .../processors/CaptureChangeMySQLTest.groovy    | 27 ++++++++++++++++++++
 2 files changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index a8c3336..e5bc8e0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                             if (cacheClient != null) {
                                 cacheClient.removeByPattern(this.getIdentifier() + ".*");
                             }
+                            // If not in a transaction, commit the session so the DDL event(s) will be transferred
+                            if (includeDDLEvents && !inTransaction) {
+                                session.commit();
+                            }
                         }
                     }
                     break;

http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index eb1f32b..1e80383 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -760,6 +760,33 @@ class CaptureChangeMySQLTest {
 
     }
 
+    @Test
+    void testDDLOutsideTransaction() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+        ))
+
+        // DROP TABLE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData
+        ))
+
+        testRunner.run(1, false, false)
+        testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/


[2/3] nifi git commit: NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL

Posted by ij...@apache.org.
NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL

This closes #1701.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d66eac2e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d66eac2e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d66eac2e

Branch: refs/heads/master
Commit: d66eac2ea10047d0e12eadb597333e5095076953
Parents: da0454d
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 14:53:21 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:42:01 2017 +0900

----------------------------------------------------------------------
 .../mysql/processors/CaptureChangeMySQL.java    |  1 +
 .../processors/CaptureChangeMySQLTest.groovy    | 40 ++++++++++++++++++++
 2 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index e5bc8e0..96be0c9 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -797,6 +797,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                 || normalizedQuery.startsWith("alter ignore table")
                                 || normalizedQuery.startsWith("create table")
                                 || normalizedQuery.startsWith("truncate table")
+                                || normalizedQuery.startsWith("rename table")
                                 || normalizedQuery.startsWith("drop table")
                                 || normalizedQuery.startsWith("drop database")) {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 1e80383..8106e40 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -787,6 +787,46 @@ class CaptureChangeMySQLTest {
         testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
     }
 
+    @Test
+    void testRenameTable() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // RENAME TABLE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'RENAME TABLE myTable TO myTable2'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        assertEquals(1, resultFiles.size())
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/


[3/3] nifi git commit: NIFI-3745: Fixed Table caching / primary key logic in PutDatabaseRecord

Posted by ij...@apache.org.
NIFI-3745: Fixed Table caching / primary key logic in PutDatabaseRecord

This closes #1700.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/097548da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/097548da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/097548da

Branch: refs/heads/master
Commit: 097548da9db218d251b43e73e2e189b89eef8510
Parents: d66eac2
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 14:44:49 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:43:13 2017 +0900

----------------------------------------------------------------------
 .../processors/standard/PutDatabaseRecord.java  | 52 ++++++-----------
 .../standard/TestPutDatabaseRecord.groovy       | 60 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2797205..fd414c4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -371,7 +371,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
-                final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+                final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
 
                 // Get the statement type from the attribute if necessary
                 String statementType = statementTypeProperty;
@@ -470,7 +470,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                 return;
                             }
 
-                            final boolean includePrimaryKeys = UPDATE_TYPE.equalsIgnoreCase(statementType) && updateKeys == null;
+                            // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
+                            // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
+                            final boolean includePrimaryKeys = (updateKeys == null);
 
                             // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
                             // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
@@ -1056,53 +1058,33 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
     static class SchemaKey {
         private final String catalog;
+        private final String schemaName;
         private final String tableName;
 
-        public SchemaKey(final String catalog, final String tableName) {
+        public SchemaKey(final String catalog, final String schemaName, final String tableName) {
             this.catalog = catalog;
+            this.schemaName = schemaName;
             this.tableName = tableName;
         }
 
         @Override
         public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
-            result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+            int result = catalog != null ? catalog.hashCode() : 0;
+            result = 31 * result + (schemaName != null ? schemaName.hashCode() : 0);
+            result = 31 * result + tableName.hashCode();
             return result;
         }
 
         @Override
-        public boolean equals(final Object obj) {
-            if (this == obj) {
-                return true;
-            }
-            if (obj == null) {
-                return false;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
 
-            final SchemaKey other = (SchemaKey) obj;
-            if (catalog == null) {
-                if (other.catalog != null) {
-                    return false;
-                }
-            } else if (!catalog.equals(other.catalog)) {
-                return false;
-            }
-
-
-            if (tableName == null) {
-                if (other.tableName != null) {
-                    return false;
-                }
-            } else if (!tableName.equals(other.tableName)) {
-                return false;
-            }
+            SchemaKey schemaKey = (SchemaKey) o;
 
-            return true;
+            if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false;
+            if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false;
+            return tableName.equals(schemaKey.tableName);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 6224e0e..bb12fb4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -345,6 +345,66 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 101)
+        parser.addRecord(2, 'rec2', 102)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt = conn.createStatement()
+        ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(101, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(102, rs.getInt(3))
+        assertFalse(rs.next())
+        stmt.close()
+        runner.clearTransferState()
+
+        parser.addRecord(1, 'rec1', 201)
+        parser.addRecord(2, 'rec2', 202)
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
+        runner.enqueue(new byte[0])
+        runner.run(1,true,false)
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(201, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(202, rs.getInt(3))
+        assertFalse(rs.next())
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
         recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
         final MockRecordParser parser = new MockRecordParser()