You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2022/12/19 16:31:04 UTC

[nifi] branch main updated: NIFI-4572: Include database and table names in CaptureChangeMySQL events even when cache is not configured

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a88980024 NIFI-4572: Include database and table names in CaptureChangeMySQL events even when cache is not configured
2a88980024 is described below

commit 2a88980024d79f89abb28cc0b13f9cac753d7a0f
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Dec 15 10:04:34 2022 -0500

    NIFI-4572: Include database and table names in CaptureChangeMySQL events even when cache is not configured
    
    This closes #6786.
    
    Signed-off-by: Tamas Palfy <tp...@apache.org>
---
 .../org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java  | 8 ++++++--
 .../nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy       | 4 ++++
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 08dad6dc40..978b8bc8d7 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -282,8 +282,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
     public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder()
             .name("capture-change-mysql-dist-map-cache-client")
             .displayName("Distributed Map Cache Client")
-            .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various tables, columns, etc. "
-                    + "needed by the processor. If a client is not specified, the generated events will not include column type or name information.")
+            .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. "
+                    + "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database "
+                    + "and table information.")
             .identifiesControllerService(DistributedMapCacheClient.class)
             .required(false)
             .build();
@@ -918,6 +919,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
                                     throw new IOException(se.getMessage(), se);
                                 }
                             }
+                        } else {
+                            // Populate a limited version of TableInfo without column information
+                            currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList());
                         }
                     } else {
                         // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index cc03e2aaa6..66828c3570 100644
--- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -262,6 +262,10 @@ class CaptureChangeMySQLTest {
         def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
         assertEquals(1, resultFiles.size())
         assertEquals('10', resultFiles[0].getAttribute(EventWriter.SEQUENCE_ID_KEY))
+        // Verify the contents of the event includes the database and table name even though the cache is not configured
+        def json = new JsonSlurper().parseText(resultFiles[0].getContent())
+        assertEquals('myDB', json['database'])
+        assertEquals('myTable', json['table_name'])
     }
 
     @Test