You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/12/25 16:58:24 UTC

[nifi] branch main updated: NIFI-10677: Add Choice data type handling to Iceberg record converter

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b24bcd2f8 NIFI-10677: Add Choice data type handling to Iceberg record converter
4b24bcd2f8 is described below

commit 4b24bcd2f823abe1534ab3392245c9c60a1293ee
Author: Mark Bathori <ba...@gmail.com>
AuthorDate: Fri Oct 21 12:58:05 2022 +0200

    NIFI-10677: Add Choice data type handling to Iceberg record converter
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6563.
---
 .../iceberg/converter/ArrayElementGetter.java      | 17 +++++-
 .../iceberg/converter/RecordFieldGetter.java       | 18 +++++-
 .../iceberg/TestIcebergRecordConverter.java        | 67 ++++++++++++++++++++--
 3 files changed, 94 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
index 9a98404b26..a344575973 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -19,7 +19,9 @@ package org.apache.nifi.processors.iceberg.converter;
 
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
@@ -92,8 +94,21 @@ public class ArrayElementGetter {
             case RECORD:
                 elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
                 break;
+            case CHOICE:
+                elementGetter = (array, pos) -> {
+                    final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+                    final DataType chosenDataType = DataTypeUtils.chooseDataType(array[pos], choiceDataType);
+                    if (chosenDataType == null) {
+                        throw new IllegalTypeConversionException(String.format(
+                                "Cannot convert value [%s] of type %s for array element to any of the following available Sub-Types for a Choice: %s",
+                                array[pos], array[pos].getClass(), choiceDataType.getPossibleSubTypes()));
+                    }
+
+                    return DataTypeUtils.convertType(array[pos], chosenDataType, ARRAY_FIELD_NAME);
+                };
+                break;
             default:
-                throw new IllegalArgumentException();
+                throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
         }
 
         return (array, pos) -> {
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
index ca50c49f89..24a21d6ef7 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -20,8 +20,10 @@ package org.apache.nifi.processors.iceberg.converter;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
@@ -94,8 +96,22 @@ public class RecordFieldGetter {
             case RECORD:
                 fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema());
                 break;
+            case CHOICE:
+                fieldGetter = record -> {
+                    final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+                    final Object value = record.getValue(fieldName);
+                    final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
+                    if (chosenDataType == null) {
+                        throw new IllegalTypeConversionException(String.format(
+                                "Cannot convert value [%s] of type %s for field %s to any of the following available Sub-Types for a Choice: %s",
+                                value, value.getClass(), fieldName, choiceDataType.getPossibleSubTypes()));
+                    }
+
+                    return DataTypeUtils.convertType(record.getValue(fieldName), chosenDataType, fieldName);
+                };
+                break;
             default:
-                throw new IllegalArgumentException();
+                throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
         }
 
         if (!isNullable) {
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 1308316e16..759b0bc576 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -38,8 +38,11 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
 import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -131,7 +134,8 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(10, "time", Types.TimeType.get()),
             Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
             Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
-            Types.NestedField.optional(13, "uuid", Types.UUIDType.get())
+            Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+            Types.NestedField.optional(14, "choice", Types.IntegerType.get())
     );
 
     private static RecordSchema getStructSchema() {
@@ -188,6 +192,16 @@ public class TestIcebergRecordConverter {
         fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
+        fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static RecordSchema getChoiceSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("integer", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("float", RecordFieldType.LONG.getDataType()));
 
         return new SimpleRecordSchema(fields);
     }
@@ -233,7 +247,7 @@ public class TestIcebergRecordConverter {
         return new MapRecord(getMapSchema(), values);
     }
 
-    private static Record setupPrimitivesTestRecord(RecordSchema schema) {
+    private static Record setupPrimitivesTestRecord() {
         LocalDate localDate = LocalDate.of(2017, 4, 4);
         LocalTime localTime = LocalTime.of(14, 20, 33);
         LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
@@ -254,14 +268,24 @@ public class TestIcebergRecordConverter {
         values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
         values.put("timestampTz", Timestamp.valueOf(localDateTime));
         values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+        values.put("choice", "10");
 
-        return new MapRecord(schema, values);
+        return new MapRecord(getPrimitivesSchema(), values);
+    }
+
+    private static Record setupChoiceTestRecord() {
+        Map<String, Object> values = new HashMap<>();
+        values.put("choice1", "20");
+        values.put("choice2", "30a");
+        values.put("choice3", String.valueOf(Long.MAX_VALUE));
+
+        return new MapRecord(getChoiceSchema(), values);
     }
 
     @Test
     public void testPrimitivesAvro() throws IOException {
         RecordSchema nifiSchema = getPrimitivesSchema();
-        Record record = setupPrimitivesTestRecord(nifiSchema);
+        Record record = setupPrimitivesTestRecord();
 
         IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
         GenericRecord genericRecord = recordConverter.convert(record);
@@ -290,13 +314,14 @@ public class TestIcebergRecordConverter {
         Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
         Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
         Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+        Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
     }
 
     @DisabledOnOs(WINDOWS)
     @Test
     public void testPrimitivesOrc() throws IOException {
         RecordSchema nifiSchema = getPrimitivesSchema();
-        Record record = setupPrimitivesTestRecord(nifiSchema);
+        Record record = setupPrimitivesTestRecord();
 
         IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
         GenericRecord genericRecord = recordConverter.convert(record);
@@ -325,12 +350,13 @@ public class TestIcebergRecordConverter {
         Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
         Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
         Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+        Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
     }
 
     @Test
     public void testPrimitivesParquet() throws IOException {
         RecordSchema nifiSchema = getPrimitivesSchema();
-        Record record = setupPrimitivesTestRecord(nifiSchema);
+        Record record = setupPrimitivesTestRecord();
 
         IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
         GenericRecord genericRecord = recordConverter.convert(record);
@@ -359,6 +385,7 @@ public class TestIcebergRecordConverter {
         Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
         Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
         Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+        Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
     }
 
     @Test
@@ -647,6 +674,34 @@ public class TestIcebergRecordConverter {
         assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
     }
 
+    @Test
+    public void testChoiceDataTypeInRecord() {
+        Record record = setupChoiceTestRecord();
+        DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
+
+        RecordFieldGetter.FieldGetter fieldGetter1 = RecordFieldGetter.createFieldGetter(dataType, "choice1", true);
+        RecordFieldGetter.FieldGetter fieldGetter2 = RecordFieldGetter.createFieldGetter(dataType, "choice2", true);
+        RecordFieldGetter.FieldGetter fieldGetter3 = RecordFieldGetter.createFieldGetter(dataType, "choice3", true);
+
+        Assertions.assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record));
+        Assertions.assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record));
+        Assertions.assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record));
+    }
+
+    @Test
+    public void testChoiceDataTypeInArray() {
+        DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
+        ArrayElementGetter.ElementGetter elementGetter = ArrayElementGetter.createElementGetter(dataType);
+
+        String[] testArray = {"20", "30a", String.valueOf(Long.MAX_VALUE)};
+
+        Assertions.assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray, 0));
+        Assertions.assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray, 1));
+        Assertions.assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray, 2));
+    }
+
     public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
         try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
                 .schema(schema)