You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2023/02/27 13:08:56 UTC

[nifi] branch main updated: NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 61b87e007b NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true
61b87e007b is described below

commit 61b87e007bfd009d41712db2c01b4b9d9c1f161e
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Feb 23 15:56:18 2023 -0500

    NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6986.
---
 .../nifi/processors/hive/UpdateHive3Table.java     |  1 +
 .../nifi/processors/hive/TestUpdateHive3Table.java | 40 ++++++++++++++++++
 .../processors/standard/UpdateDatabaseTable.java   |  1 +
 .../standard/TestUpdateDatabaseTable.java          | 49 ++++++++++++++++++++++
 4 files changed, 91 insertions(+)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
index ba53d6e29d..5e00e0904f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
@@ -646,6 +646,7 @@ public class UpdateHive3Table extends AbstractProcessor {
                     if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
                         // The field does not exist in the table (and is not a partition column), add it
                         columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
+                        hiveColumns.add(recordFieldName);
                         getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
                     }
                 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
index 4a57097055..a4027c1272 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
@@ -493,6 +493,46 @@ public class TestUpdateHive3Table {
         assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
     }
 
+    @Test
+    public void testAddColumnUpdateFields() throws Exception {
+        configure(processor, 1);
+        runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
+        final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
+        runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
+
+        RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
+        runner.addControllerService("writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
+        runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        HashMap<String,String> attrs = new HashMap<>();
+        attrs.put("continent", "Asia");
+        attrs.put("country", "China");
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+
+        runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "messages");
+        flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
+        List<String> statements = service.getExecutedStatements();
+        assertEquals(2, statements.size());
+        // All columns from users table/data should be added to the table, and a new partition should be added
+        assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
+                statements.get(0));
+        assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
+                statements.get(1));
+
+        // The input reader is for a different table, so none of the columns match. This results in an empty output FlowFile
+        flowFile.assertContentEquals("");
+    }
+
     private static final class MockUpdateHive3Table extends UpdateHive3Table {
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
index 8261bc86ec..04d07f7bae 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
@@ -520,6 +520,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
                         ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
                                 recordField.getDefaultValue() != null, null, recordField.isNullable());
                         columnsToAdd.add(columnToAdd);
+                        dbColumns.add(recordFieldName);
                         getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
                     }
                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
index 058c9f6555..4a8d6c129e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
@@ -21,6 +21,7 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
@@ -359,6 +360,54 @@ public class TestUpdateDatabaseTable {
         }
     }
 
+    @Test
+    public void testAddColumnToExistingTableUpdateFieldNames() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+
+            MockRecordParser readerFactory = new MockRecordParser();
+
+            readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false));
+            readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true));
+            readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true));
+            readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true));
+            readerFactory.addRecord(1, "name1", null, "test");
+
+            runner.addControllerService("mock-reader-factory", readerFactory);
+            runner.enableControllerService(readerFactory);
+
+            runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory");
+            runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
+            runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
+            runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "true");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "false");
+            runner.setProperty(UpdateDatabaseTable.UPDATE_FIELD_NAMES, "true");
+
+            MockRecordWriter writerFactory = new MockRecordWriter();
+            runner.addControllerService("mock-writer-factory", writerFactory);
+            runner.enableControllerService(writerFactory);
+            runner.setProperty(UpdateDatabaseTable.RECORD_WRITER_FACTORY, "mock-writer-factory");
+
+            runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName());
+            runner.addControllerService("dbcp", service);
+            runner.enableControllerService(service);
+            runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put("db.name", "default");
+            attrs.put("table.name", "persons");
+            runner.enqueue(new byte[0], attrs);
+            runner.run();
+
+            runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+            final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+            // Ensure the additional field is written out to the FlowFile
+            flowFile.assertContentEquals("\"1\",\"name1\",\"0\",\"test\"\n");
+        }
+    }
+
     /**
      * Simple implementation only for testing purposes
      */