You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2023/02/27 13:08:56 UTC
[nifi] branch main updated: NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 61b87e007b NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true
61b87e007b is described below
commit 61b87e007bfd009d41712db2c01b4b9d9c1f161e
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Feb 23 15:56:18 2023 -0500
NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #6986.
---
.../nifi/processors/hive/UpdateHive3Table.java | 1 +
.../nifi/processors/hive/TestUpdateHive3Table.java | 40 ++++++++++++++++++
.../processors/standard/UpdateDatabaseTable.java | 1 +
.../standard/TestUpdateDatabaseTable.java | 49 ++++++++++++++++++++++
4 files changed, 91 insertions(+)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
index ba53d6e29d..5e00e0904f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/UpdateHive3Table.java
@@ -646,6 +646,7 @@ public class UpdateHive3Table extends AbstractProcessor {
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
+ hiveColumns.add(recordFieldName);
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
}
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
index 4a57097055..a4027c1272 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestUpdateHive3Table.java
@@ -493,6 +493,46 @@ public class TestUpdateHive3Table {
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
}
+ @Test
+ public void testAddColumnUpdateFields() throws Exception {
+ configure(processor, 1);
+ runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
+ final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
+ runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
+
+ RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
+ runner.addControllerService("writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
+ runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+ HashMap<String,String> attrs = new HashMap<>();
+ attrs.put("continent", "Asia");
+ attrs.put("country", "China");
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+
+ runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "messages");
+ flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
+ List<String> statements = service.getExecutedStatements();
+ assertEquals(2, statements.size());
+ // All columns from users table/data should be added to the table, and a new partition should be added
+ assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
+ statements.get(0));
+ assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
+ statements.get(1));
+
+ // The input reader is for a different table, so none of the columns match. This results in an empty output FlowFile
+ flowFile.assertContentEquals("");
+ }
+
private static final class MockUpdateHive3Table extends UpdateHive3Table {
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
index 8261bc86ec..04d07f7bae 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
@@ -520,6 +520,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
recordField.getDefaultValue() != null, null, recordField.isNullable());
columnsToAdd.add(columnToAdd);
+ dbColumns.add(recordFieldName);
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
index 058c9f6555..4a8d6c129e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
@@ -21,6 +21,7 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
@@ -359,6 +360,54 @@ public class TestUpdateDatabaseTable {
}
}
+ @Test
+ public void testAddColumnToExistingTableUpdateFieldNames() throws Exception {
+ runner = TestRunners.newTestRunner(processor);
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+
+ MockRecordParser readerFactory = new MockRecordParser();
+
+ readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false));
+ readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true));
+ readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true));
+ readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true));
+ readerFactory.addRecord(1, "name1", null, "test");
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory");
+ runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
+ runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
+ runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "true");
+ runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "false");
+ runner.setProperty(UpdateDatabaseTable.UPDATE_FIELD_NAMES, "true");
+
+ MockRecordWriter writerFactory = new MockRecordWriter();
+ runner.addControllerService("mock-writer-factory", writerFactory);
+ runner.enableControllerService(writerFactory);
+ runner.setProperty(UpdateDatabaseTable.RECORD_WRITER_FACTORY, "mock-writer-factory");
+
+ runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("db.name", "default");
+ attrs.put("table.name", "persons");
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+
+ runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+ // Ensure the additional field is written out to the FlowFile
+ flowFile.assertContentEquals("\"1\",\"name1\",\"0\",\"test\"\n");
+ }
+ }
+
/**
* Simple implementation only for testing purposes
*/