You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/27 01:33:13 UTC
[1/3] nifi git commit: NIFI-3746: Fixed DDL event transfer when
outside a transaction in CaptureChangeMySQL
Repository: nifi
Updated Branches:
refs/heads/master 24bb8cf95 -> 097548da9
NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL
This closes #1702.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da0454d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da0454d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da0454d8
Branch: refs/heads/master
Commit: da0454d80f5c0e5b0268dbd279a80d21d62e7c85
Parents: 24bb8cf
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 15:19:12 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:35:34 2017 +0900
----------------------------------------------------------------------
.../mysql/processors/CaptureChangeMySQL.java | 4 +++
.../processors/CaptureChangeMySQLTest.groovy | 27 ++++++++++++++++++++
2 files changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index a8c3336..e5bc8e0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (cacheClient != null) {
cacheClient.removeByPattern(this.getIdentifier() + ".*");
}
+ // If not in a transaction, commit the session so the DDL event(s) will be transferred
+ if (includeDDLEvents && !inTransaction) {
+ session.commit();
+ }
}
}
break;
http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index eb1f32b..1e80383 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -760,6 +760,33 @@ class CaptureChangeMySQLTest {
}
+ @Test
+ void testDDLOutsideTransaction() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+ testRunner.run(1, false, true)
+
+ // ROTATE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+ ))
+
+ // DROP TABLE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData
+ ))
+
+ testRunner.run(1, false, false)
+ testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
+ }
+
/********************************
* Mock and helper classes below
********************************/
[2/3] nifi git commit: NIFI-3743: Include RENAME TABLE events in
CaptureChangeMySQL
Posted by ij...@apache.org.
NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL
This closes #1701.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d66eac2e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d66eac2e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d66eac2e
Branch: refs/heads/master
Commit: d66eac2ea10047d0e12eadb597333e5095076953
Parents: da0454d
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 14:53:21 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:42:01 2017 +0900
----------------------------------------------------------------------
.../mysql/processors/CaptureChangeMySQL.java | 1 +
.../processors/CaptureChangeMySQLTest.groovy | 40 ++++++++++++++++++++
2 files changed, 41 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index e5bc8e0..96be0c9 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -797,6 +797,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| normalizedQuery.startsWith("alter ignore table")
|| normalizedQuery.startsWith("create table")
|| normalizedQuery.startsWith("truncate table")
+ || normalizedQuery.startsWith("rename table")
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 1e80383..8106e40 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -787,6 +787,46 @@ class CaptureChangeMySQLTest {
testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
}
+ @Test
+ void testRenameTable() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+ testRunner.run(1, false, true)
+
+ // ROTATE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ // RENAME TABLE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'RENAME TABLE myTable TO myTable2'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+ assertEquals(1, resultFiles.size())
+ }
+
/********************************
* Mock and helper classes below
********************************/
[3/3] nifi git commit: NIFI-3745: Fixed Table caching / primary key
logic in PutDatabaseRecord
Posted by ij...@apache.org.
NIFI-3745: Fixed Table caching / primary key logic in PutDatabaseRecord
This closes #1700.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/097548da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/097548da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/097548da
Branch: refs/heads/master
Commit: 097548da9db218d251b43e73e2e189b89eef8510
Parents: d66eac2
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Apr 26 14:44:49 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Apr 27 08:43:13 2017 +0900
----------------------------------------------------------------------
.../processors/standard/PutDatabaseRecord.java | 52 ++++++-----------
.../standard/TestPutDatabaseRecord.groovy | 60 ++++++++++++++++++++
2 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2797205..fd414c4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -371,7 +371,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
- final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+ final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
// Get the statement type from the attribute if necessary
String statementType = statementTypeProperty;
@@ -470,7 +470,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
return;
}
- final boolean includePrimaryKeys = UPDATE_TYPE.equalsIgnoreCase(statementType) && updateKeys == null;
+ // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
+ // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
+ final boolean includePrimaryKeys = (updateKeys == null);
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
@@ -1056,53 +1058,33 @@ public class PutDatabaseRecord extends AbstractProcessor {
static class SchemaKey {
private final String catalog;
+ private final String schemaName;
private final String tableName;
- public SchemaKey(final String catalog, final String tableName) {
+ public SchemaKey(final String catalog, final String schemaName, final String tableName) {
this.catalog = catalog;
+ this.schemaName = schemaName;
this.tableName = tableName;
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
- result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+ int result = catalog != null ? catalog.hashCode() : 0;
+ result = 31 * result + (schemaName != null ? schemaName.hashCode() : 0);
+ result = 31 * result + tableName.hashCode();
return result;
}
@Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- final SchemaKey other = (SchemaKey) obj;
- if (catalog == null) {
- if (other.catalog != null) {
- return false;
- }
- } else if (!catalog.equals(other.catalog)) {
- return false;
- }
-
-
- if (tableName == null) {
- if (other.tableName != null) {
- return false;
- }
- } else if (!tableName.equals(other.tableName)) {
- return false;
- }
+ SchemaKey schemaKey = (SchemaKey) o;
- return true;
+ if (catalog != null ? !catalog.equals(schemaKey.catalog) : schemaKey.catalog != null) return false;
+ if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) : schemaKey.schemaName != null) return false;
+ return tableName.equals(schemaKey.tableName);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 6224e0e..bb12fb4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -345,6 +345,66 @@ class TestPutDatabaseRecord {
}
@Test
+ void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(1, 'rec1', 101)
+ parser.addRecord(2, 'rec2', 102)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ final Connection conn = dbcp.getConnection()
+ Statement stmt = conn.createStatement()
+ stmt = conn.createStatement()
+ ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(101, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(102, rs.getInt(3))
+ assertFalse(rs.next())
+ stmt.close()
+ runner.clearTransferState()
+
+ parser.addRecord(1, 'rec1', 201)
+ parser.addRecord(2, 'rec2', 202)
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
+ runner.enqueue(new byte[0])
+ runner.run(1,true,false)
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ stmt = conn.createStatement()
+ rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(201, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(202, rs.getInt(3))
+ assertFalse(rs.next())
+ stmt.close()
+ conn.close()
+ }
+
+ @Test
void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
final MockRecordParser parser = new MockRecordParser()