You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "mattyb149 (via GitHub)" <gi...@apache.org> on 2023/06/22 00:43:50 UTC

[GitHub] [nifi] mattyb149 opened a new pull request, #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

mattyb149 opened a new pull request, #7421:
URL: https://github.com/apache/nifi/pull/7421

   # Summary
   
   [NIFI-11739](https://issues.apache.org/jira/browse/NIFI-11739) This PR adds the "Unmatched Column Behavior" property to PutIceberg that is in other table-based processors such as PutDatabaseRecord. If a column exists in the target table but is not a field in the incoming records, use the data type from the target table and a value of null. If the property is set to Fail (the default and current behavior), the FlowFile will fail. If Warn, a log warning is issued. If Ignore, the aforementioned behavior is employed.
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1272569504


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -375,4 +364,44 @@ void abort(DataFile[] dataFiles, Table table) {
                 .run(file -> table.io().deleteFile(file.path().toString()));
     }
 
+    public enum UnmatchedColumnBehavior implements DescribedValue {
+        IGNORE_UNMATCHED_COLUMN("IGNORE_UNMATCHED_COLUMN",

Review Comment:
   Instead of specifying the `value` as the first argument to the enum, you can use `name()` in the getValue() method.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1273229002


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java:
##########
@@ -139,16 +139,15 @@ private void initCatalog(PartitionSpec spec, String fileFormat) throws Initializ
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        initCatalog(spec, fileFormat);
+        initCatalog(spec, FileFormat.AVRO.name());

Review Comment:
   I meant you should change the initCatalog method:
   `initCatalog(PartitionSpec spec, FileFormat fileFormat)`
   And call the `name()` method in that function so you don't need to call it at every method call.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -466,8 +511,52 @@ public void testPrimitives(FileFormat format) throws IOException {
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testPrimitivesIgnoreMissingFields(FileFormat format) throws IOException {

Review Comment:
   This test is still failing on AVRO and PARQUET if the Iceberg schema fields are set to required from optional. 



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
+                    converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   This still needs to be addressed.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -466,8 +511,52 @@ public void testPrimitives(FileFormat format) throws IOException {
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testPrimitivesIgnoreMissingFields(FileFormat format) throws IOException {

Review Comment:
   This test is still failing on AVRO and PARQUET if the Iceberg schema fields are set to required from optional. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1273247443


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
+                    converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   > I think we shouldn't set the `target field name` to the` source field name` because it is confusing. The `source name` represents the `NiFi record field name` and the `target name `represents the `iceberg record field name`. We should leave it null and set only the target.
   
   This still needs to be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1272452154


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -466,19 +507,104 @@ public void testPrimitives(FileFormat format) throws IOException {
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
-        } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
-        }
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException {
+    @Test
+    public void testPrimitivesIgnoreMissingFields() throws IOException {

Review Comment:
   I did it almost across the board just to increase performance of the unit tests, I will add them in for the new tests to ensure they pass all formats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1259330971


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -130,6 +132,9 @@ static class UUIDtoByteArrayConverter extends DataConverter<Object, byte[]> {
         @Override
         public byte[] convert(Object data) {
             final UUID uuid = DataTypeUtils.toUUID(data);
+            if (uuid == null) {

Review Comment:
   This part is unreachable since the `DataTypeUtils.toUUID` will throw a `IllegalTypeConversionException` if you pass a null value to it.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
+                    converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   I think we shouldn't set the `target field name` to the` source field name` because it is confusing. The `source name` represents the `NiFi record field name` and the `target name `represents the `iceberg record field name`. We should leave it null and set only the target.
   ```suggestion
                   if (mappedFieldName.isPresent()) {
                       final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
                       converter.setSourceFieldName(recordField.get().getFieldName());
                   }
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -245,9 +250,14 @@ static class RecordConverter extends DataConverter<Record, GenericRecord> {
 
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<RecordField> recordField = recordSchema.getField(converter.getSourceFieldName());
-                final RecordField field = recordField.get();
-                // creates a record field accessor for every data converter
-                getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable()));
+                if (recordField.isEmpty()) {
+                    Types.NestedField missingField = schema.field(converter.getSourceFieldName());

Review Comment:
   Here you should get the `targetFieldName` since it contains the iceberg schema field's name.
   ```suggestion
                       Types.NestedField missingField = schema.field(converter.getTargetFieldName());
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -466,19 +507,104 @@ public void testPrimitives(FileFormat format) throws IOException {
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
-        } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
-        }
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException {
+    @Test
+    public void testPrimitivesIgnoreMissingFields() throws IOException {

Review Comment:
   If I set the Iceberg schema fields from optional to required the new tests are failing on `AVRO` and `PARQUET`. This is why it is not recommended to remove the `ParameterizedTest` since it is testing different writers for the different formats. 



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java:
##########
@@ -134,12 +133,12 @@ private void initCatalog(PartitionSpec spec, String fileFormat) throws Initializ
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
+        final String fileFormat = "avro";

Review Comment:
   Please use `org.apache.iceberg.FileFormat` instead of strings. Also you can pass this parameter directly to the `initCatalog` method since it is only used there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] joewitt commented on pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "joewitt (via GitHub)" <gi...@apache.org>.
joewitt commented on PR #7421:
URL: https://github.com/apache/nifi/pull/7421#issuecomment-1742161608

   @mark-bathori Any further inputs?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1242475244


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -473,6 +516,103 @@ public void testPrimitives(FileFormat format) throws IOException {
         }
     }
 
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})

Review Comment:
   One general note on these tests, it it probably worth avoiding some of the repeated tests in the interest of execution time. Several other tests ran through all formats, requiring over a minute to exercise all cases. Keeping unit test method runs under a second would be ideal, so these new tests may need to be more narrowly scoped to avoid excessive running time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mark-bathori commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1272621601


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java:
##########
@@ -134,12 +133,12 @@ private void initCatalog(PartitionSpec spec, String fileFormat) throws Initializ
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
+        final String fileFormat = "avro";

Review Comment:
   Yes the parameter type can be changed in the `initCatalog` method and you can call `.name()` in it.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
+                    converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   Yes, you don't need to set it since you have it already in the targetFieldName attribute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11739: Add ability to ignore missing fields in PutIceberg [nifi]

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1342539976


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -433,11 +494,11 @@ private static Record setupChoiceTestRecord() {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest

Review Comment:
   Please don't remove `@ParameterizedTest` in this test class since it is testing different writers for different format types.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -267,6 +278,7 @@ public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile
 
             final WriteResult result = taskWriter.complete();
             appendDataFiles(context, flowFile, table, result);
+            taskWriter.close();

Review Comment:
   This is not needed. The `complete()` method closes the task writer.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -156,8 +174,25 @@ public DataType fieldPartner(DataType dataType, int fieldId, String name) {
             final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType;
 
             final Optional<String> mappedFieldName = recordType.getNameMapping(name);
-            Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
-
+            if (UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
+                Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
+            }
+            if (mappedFieldName.isEmpty()) {
+                if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
+                    if (logger != null) {

Review Comment:
   If the constructor with null value logger will be removed, this check can be removed too.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -46,19 +48,29 @@
 public class IcebergRecordConverter {
 
     private final DataConverter<Record, GenericRecord> converter;
+    public final UnmatchedColumnBehavior unmatchedColumnBehavior;
+    public ComponentLog logger;
+
     public GenericRecord convert(Record record) {
         return converter.convert(record);
     }
 
-    @SuppressWarnings("unchecked")
     public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
-        this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
+        this(schema, recordSchema, fileFormat, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, null);

Review Comment:
   As I can see this constructor was added only for unit tests to have default null value for logger. I think this should be removed and add a logger to the unit test that can be passed as parameter to the other constructor.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -265,9 +267,14 @@ static class RecordConverter extends DataConverter<Record, GenericRecord> {
 
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<RecordField> recordField = recordSchema.getField(converter.getSourceFieldName());
-                final RecordField field = recordField.get();
-                // creates a record field accessor for every data converter
-                getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable()));
+                if (recordField.isEmpty()) {
+                    Types.NestedField missingField = schema.field(converter.getTargetFieldName());

Review Comment:
   This can be final.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -290,4 +297,54 @@ private <S, T> T convert(Record record, DataConverter<S, T> converter) {
             return converter.convert((S) getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
         }
     }
+
+    public static DataType convertSchemaTypeToDataType(Type schemaType) {

Review Comment:
   I think it would be better to throw an exception in case of unmatching schema type than returning with a null value.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -504,9 +609,80 @@ public void testPrimitives(FileFormat format) throws IOException {
     @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException {
+    public void testPrimitivesMissingRequiredFields(FileFormat format) {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        Record record = setupPrimitivesTestRecordMissingFields();

Review Comment:
   This field is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1272450347


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
+                    converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   I think the target field name here is necessary because the source field is missing right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg

Posted by "mattyb149 (via GitHub)" <gi...@apache.org>.
mattyb149 commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1272460805


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java:
##########
@@ -134,12 +133,12 @@ private void initCatalog(PartitionSpec spec, String fileFormat) throws Initializ
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
+        final String fileFormat = "avro";

Review Comment:
   initCatalog already uses Strings, are you saying we should change the parameter type to FileFormat? If not, I have changed `"avro"` to `FileFormat.AVRO.name()` and put it directly in the call to initCatalog. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11739: Add ability to ignore missing fields in PutIceberg [nifi]

Posted by "mark-bathori (via GitHub)" <gi...@apache.org>.
mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1342676660


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -156,8 +174,25 @@ public DataType fieldPartner(DataType dataType, int fieldId, String name) {
             final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType;
 
             final Optional<String> mappedFieldName = recordType.getNameMapping(name);
-            Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
-
+            if (UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
+                Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
+            }
+            if (mappedFieldName.isEmpty()) {
+                if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
+                    if (logger != null) {
+                        logger.warn("Cannot find field with name '" + name + "' in the record schema, using the target schema for datatype and a null value");
+                    }
+                }
+                // If the field is missing, use the expected type from the schema (converted to a DataType)
+                final Types.NestedField schemaField = schema.findField(fieldId);
+                final Type schemaFieldType = schemaField.type();
+                if(schemaField.isRequired()) {

Review Comment:
   ```suggestion
                   if (schemaField.isRequired()) {
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+            .name("unmatched-column-behavior")
+            .displayName("Unmatched Column Behavior")
+            .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
+            .allowableValues(UnmatchedColumnBehavior.class)
+            .defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
+            .required(true)
+            .build();

Review Comment:
   ```suggestion
               .build();
               
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+            .name("unmatched-column-behavior")
+            .displayName("Unmatched Column Behavior")
+            .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")

Review Comment:
   ```suggestion
               .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11739: Add ability to ignore missing fields in PutIceberg [nifi]

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #7421:
URL: https://github.com/apache/nifi/pull/7421#issuecomment-1744787034

   Thanks @mattyb149 and thanks for the review @mark-bathori , merging now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] NIFI-11739: Add ability to ignore missing fields in PutIceberg [nifi]

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #7421: NIFI-11739: Add ability to ignore missing fields in PutIceberg
URL: https://github.com/apache/nifi/pull/7421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org