You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/28 04:18:40 UTC
nifi git commit: NIFI-3749: Added database filtering to DDL events in
CaptureChangeMySQL
Repository: nifi
Updated Branches:
refs/heads/master 2664ea093 -> aa4efb43c
NIFI-3749: Added database filtering to DDL events in CaptureChangeMySQL
This closes #1708.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aa4efb43
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aa4efb43
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aa4efb43
Branch: refs/heads/master
Commit: aa4efb43ca5dc85b55bc39c180ca447f4e9e3e01
Parents: 2664ea0
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Apr 27 10:30:47 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Apr 28 13:16:01 2017 +0900
----------------------------------------------------------------------
.../mysql/event/BeginTransactionEventInfo.java | 9 ++-
.../mysql/event/CommitTransactionEventInfo.java | 9 ++-
.../event/io/BeginTransactionEventWriter.java | 11 +++
.../event/io/CommitTransactionEventWriter.java | 11 ++-
.../mysql/processors/CaptureChangeMySQL.java | 32 ++++++---
.../processors/CaptureChangeMySQLTest.groovy | 71 ++++++++++++++++++++
6 files changed, 129 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
index adf96be..c6f5af0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
@@ -21,7 +21,14 @@ package org.apache.nifi.cdc.mysql.event;
*/
public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
- public BeginTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
+ private String databaseName;
+
+ public BeginTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition);
+ this.databaseName = databaseName;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
index 96a84d3..b4a230a 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
@@ -22,7 +22,14 @@ package org.apache.nifi.cdc.mysql.event;
*/
public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
- public CommitTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
+ private String databaseName;
+
+ public CommitTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition);
+ this.databaseName = databaseName;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
index b1e3511..ea1f5d2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
@@ -18,8 +18,19 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
+import java.io.IOException;
+
/**
* A writer for events corresponding to the beginning of a MySQL transaction
*/
public class BeginTransactionEventWriter extends AbstractBinlogEventWriter<BeginTransactionEventInfo> {
+
+ protected void writeJson(BeginTransactionEventInfo event) throws IOException {
+ super.writeJson(event);
+ if (event.getDatabaseName() != null) {
+ jsonGenerator.writeStringField("database", event.getDatabaseName());
+ } else {
+ jsonGenerator.writeNullField("database");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
index 58b77a9..c69b4b2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
@@ -19,9 +19,18 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import java.io.IOException;
+
/**
* A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
*/
public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
-
+ protected void writeJson(CommitTransactionEventInfo event) throws IOException {
+ super.writeJson(event);
+ if (event.getDatabaseName() != null) {
+ jsonGenerator.writeStringField("database", event.getDatabaseName());
+ } else {
+ jsonGenerator.writeNullField("database");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index c328835..037882d 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -149,8 +149,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
.name("capture-change-mysql-db-name-pattern")
.displayName("Database/Schema Name Pattern")
- .description("A regular expression (regex) for matching databases or schemas (depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
- + "the schema name as it is stored in the database. If the property is not set, the schema name will not be used to filter the CDC events.")
+ .description("A regular expression (regex) for matching databases (or schemas, depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
+ + "the database name as it is stored in the RDBMS. If the property is not set, the database name will not be used to filter the CDC events. "
+ + "NOTE: DDL events, even if they affect different databases, are associated with the database used by the session to execute the DDL. "
+ + "This means if a connection is made to one database, but the DDL is issued against another, then the connected database will be the one matched against "
+ + "the specified pattern.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -351,6 +354,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private volatile long xactSequenceId = 0;
private volatile TableInfo currentTable = null;
+ private volatile String currentDatabase = null;
private volatile Pattern databaseNamePattern;
private volatile Pattern tableNamePattern;
private volatile boolean includeBeginCommit = false;
@@ -759,9 +763,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
break;
case QUERY:
- // Is this the start of a transaction?
QueryEventData queryEventData = event.getData();
+ currentDatabase = queryEventData.getDatabase();
+
String sql = queryEventData.getSql();
+
+ // Is this the start of a transaction?
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
@@ -772,8 +779,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
xactBinlogPosition = currentBinlogPosition;
xactSequenceId = currentSequenceId.get();
- if (includeBeginCommit) {
- BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+ if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+ BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
}
inTransaction = true;
@@ -783,8 +790,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+ "This could indicate that your binlog position is invalid.");
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
- if (includeBeginCommit) {
- CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+ if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+ CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
@@ -803,8 +810,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {
- if (includeDDLEvents) {
- DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
+ if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+ // If we don't have table information, we can still use the database name
+ TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
+ DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Remove all the keys from the cache that this processor added
@@ -824,14 +833,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
}
- if (includeBeginCommit) {
- CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+ if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+ CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
session.commit();
inTransaction = false;
currentTable = null;
+ currentDatabase = null;
break;
case WRITE_ROWS:
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index a8edb3a..3eb1f17 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -27,6 +27,7 @@ import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
+import groovy.json.JsonSlurper
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
@@ -669,6 +670,76 @@ class CaptureChangeMySQLTest {
}
@Test
+ void testFilterDatabase() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+ testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+ testRunner.run(1, false, true)
+
+ // ROTATE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+ ))
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
+ [database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ ////////////////////////
+ // Test database filter
+ ////////////////////////
+
+ // BEGIN
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+ [database: 'NotMyDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
+ [database: 'NotMyDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
+ ))
+
+ // COMMIT
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+ // First BEGIN + DDL + COMMIT only
+ assertEquals(3, resultFiles.size())
+
+ // Check that the database name is set on the objects
+ resultFiles.each {f ->
+ def json = new JsonSlurper().parseText(new String(f.toByteArray()))
+ assertEquals('myDB', json.database)
+ }
+ }
+
+ @Test
void testTransactionAcrossMultipleProcessorExecutions() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')