You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/28 04:18:40 UTC

nifi git commit: NIFI-3749: Added database filtering to DDL events in CaptureChangeMySQL

Repository: nifi
Updated Branches:
  refs/heads/master 2664ea093 -> aa4efb43c


NIFI-3749: Added database filtering to DDL events in CaptureChangeMySQL

This closes #1708.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aa4efb43
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aa4efb43
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aa4efb43

Branch: refs/heads/master
Commit: aa4efb43ca5dc85b55bc39c180ca447f4e9e3e01
Parents: 2664ea0
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Apr 27 10:30:47 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Apr 28 13:16:01 2017 +0900

----------------------------------------------------------------------
 .../mysql/event/BeginTransactionEventInfo.java  |  9 ++-
 .../mysql/event/CommitTransactionEventInfo.java |  9 ++-
 .../event/io/BeginTransactionEventWriter.java   | 11 +++
 .../event/io/CommitTransactionEventWriter.java  | 11 ++-
 .../mysql/processors/CaptureChangeMySQL.java    | 32 ++++++---
 .../processors/CaptureChangeMySQLTest.groovy    | 71 ++++++++++++++++++++
 6 files changed, 129 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
index adf96be..c6f5af0 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
@@ -21,7 +21,14 @@ package org.apache.nifi.cdc.mysql.event;
  */
 public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
 
-    public BeginTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
+    private String databaseName;
+
+    public BeginTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
         super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition);
+        this.databaseName = databaseName;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
index 96a84d3..b4a230a 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
@@ -22,7 +22,14 @@ package org.apache.nifi.cdc.mysql.event;
  */
 public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
 
-    public CommitTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
+    private String databaseName;
+
+    public CommitTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
         super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition);
+        this.databaseName = databaseName;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
index b1e3511..ea1f5d2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
@@ -18,8 +18,19 @@ package org.apache.nifi.cdc.mysql.event.io;
 
 import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
 
+import java.io.IOException;
+
 /**
  * A writer for events corresponding to the beginning of a MySQL transaction
  */
 public class BeginTransactionEventWriter extends AbstractBinlogEventWriter<BeginTransactionEventInfo> {
+
+    protected void writeJson(BeginTransactionEventInfo event) throws IOException {
+        super.writeJson(event);
+        if (event.getDatabaseName() != null) {
+            jsonGenerator.writeStringField("database", event.getDatabaseName());
+        } else {
+            jsonGenerator.writeNullField("database");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
index 58b77a9..c69b4b2 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
@@ -19,9 +19,18 @@ package org.apache.nifi.cdc.mysql.event.io;
 
 import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
 
+import java.io.IOException;
+
 /**
  * A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
  */
 public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
-
+    protected void writeJson(CommitTransactionEventInfo event) throws IOException {
+        super.writeJson(event);
+        if (event.getDatabaseName() != null) {
+            jsonGenerator.writeStringField("database", event.getDatabaseName());
+        } else {
+            jsonGenerator.writeNullField("database");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index c328835..037882d 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -149,8 +149,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-db-name-pattern")
             .displayName("Database/Schema Name Pattern")
-            .description("A regular expression (regex) for matching databases or schemas (depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
-                    + "the schema name as it is stored in the database. If the property is not set, the schema name will not be used to filter the CDC events.")
+            .description("A regular expression (regex) for matching databases (or schemas, depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
+                    + "the database name as it is stored in the RDBMS. If the property is not set, the database name will not be used to filter the CDC events. "
+                    + "NOTE: DDL events, even if they affect different databases, are associated with the database used by the session to execute the DDL. "
+                    + "This means if a connection is made to one database, but the DDL is issued against another, then the connected database will be the one matched against "
+                    + "the specified pattern.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -351,6 +354,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     private volatile long xactSequenceId = 0;
 
     private volatile TableInfo currentTable = null;
+    private volatile String currentDatabase = null;
     private volatile Pattern databaseNamePattern;
     private volatile Pattern tableNamePattern;
     private volatile boolean includeBeginCommit = false;
@@ -759,9 +763,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                     }
                     break;
                 case QUERY:
-                    // Is this the start of a transaction?
                     QueryEventData queryEventData = event.getData();
+                    currentDatabase = queryEventData.getDatabase();
+
                     String sql = queryEventData.getSql();
+
+                    // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad happened, alert the user
                         if (inTransaction) {
@@ -772,8 +779,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         xactBinlogPosition = currentBinlogPosition;
                         xactSequenceId = currentSequenceId.get();
 
-                        if (includeBeginCommit) {
-                            BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+                        if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+                            BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                             currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
                         inTransaction = true;
@@ -783,8 +790,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                     + "This could indicate that your binlog position is invalid.");
                         }
                         // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
-                        if (includeBeginCommit) {
-                            CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+                        if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+                            CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                             currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                         }
                         // Commit the NiFi session
@@ -803,8 +810,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                 || normalizedQuery.startsWith("drop table")
                                 || normalizedQuery.startsWith("drop database")) {
 
-                            if (includeDDLEvents) {
-                                DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
+                            if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+                                // If we don't have table information, we can still use the database name
+                                TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
+                                DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
                                 currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
                             }
                             // Remove all the keys from the cache that this processor added
@@ -824,14 +833,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                         throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
                                 + "This could indicate that your binlog position is invalid.");
                     }
-                    if (includeBeginCommit) {
-                        CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+                    if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
+                        CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
                         currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
                     }
                     // Commit the NiFi session
                     session.commit();
                     inTransaction = false;
                     currentTable = null;
+                    currentDatabase = null;
                     break;
 
                 case WRITE_ROWS:

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa4efb43/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index a8edb3a..3eb1f17 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -27,6 +27,7 @@ import com.github.shyiko.mysql.binlog.event.RotateEventData
 import com.github.shyiko.mysql.binlog.event.TableMapEventData
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
+import groovy.json.JsonSlurper
 import org.apache.commons.io.output.WriterOutputStream
 import org.apache.nifi.cdc.mysql.MockBinlogClient
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
@@ -669,6 +670,76 @@ class CaptureChangeMySQLTest {
     }
 
     @Test
+    void testFilterDatabase() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
+                [database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        ////////////////////////
+        // Test database filter
+        ////////////////////////
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
+                [database: 'NotMyDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
+                [database: 'NotMyDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        // First BEGIN + DDL + COMMIT only
+        assertEquals(3, resultFiles.size())
+
+        // Check that the database name is set on the objects
+        resultFiles.each {f ->
+            def json = new JsonSlurper().parseText(new String(f.toByteArray()))
+            assertEquals('myDB', json.database)
+        }
+    }
+
+    @Test
     void testTransactionAcrossMultipleProcessorExecutions() throws Exception {
         testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
         testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')