You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/12/25 16:58:24 UTC
[nifi] branch main updated: NIFI-10677: Add Choice data type handling to Iceberg record converter
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4b24bcd2f8 NIFI-10677: Add Choice data type handling to Iceberg record converter
4b24bcd2f8 is described below
commit 4b24bcd2f823abe1534ab3392245c9c60a1293ee
Author: Mark Bathori <ba...@gmail.com>
AuthorDate: Fri Oct 21 12:58:05 2022 +0200
NIFI-10677: Add Choice data type handling to Iceberg record converter
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #6563.
---
.../iceberg/converter/ArrayElementGetter.java | 17 +++++-
.../iceberg/converter/RecordFieldGetter.java | 18 +++++-
.../iceberg/TestIcebergRecordConverter.java | 67 ++++++++++++++++++++--
3 files changed, 94 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
index 9a98404b26..a344575973 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -19,7 +19,9 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -92,8 +94,21 @@ public class ArrayElementGetter {
case RECORD:
elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
break;
+ case CHOICE:
+ elementGetter = (array, pos) -> {
+ final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ final DataType chosenDataType = DataTypeUtils.chooseDataType(array[pos], choiceDataType);
+ if (chosenDataType == null) {
+ throw new IllegalTypeConversionException(String.format(
+ "Cannot convert value [%s] of type %s for array element to any of the following available Sub-Types for a Choice: %s",
+ array[pos], array[pos].getClass(), choiceDataType.getPossibleSubTypes()));
+ }
+
+ return DataTypeUtils.convertType(array[pos], chosenDataType, ARRAY_FIELD_NAME);
+ };
+ break;
default:
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
}
return (array, pos) -> {
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
index ca50c49f89..24a21d6ef7 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -20,8 +20,10 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -94,8 +96,22 @@ public class RecordFieldGetter {
case RECORD:
fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema());
break;
+ case CHOICE:
+ fieldGetter = record -> {
+ final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ final Object value = record.getValue(fieldName);
+ final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
+ if (chosenDataType == null) {
+ throw new IllegalTypeConversionException(String.format(
+ "Cannot convert value [%s] of type %s for field %s to any of the following available Sub-Types for a Choice: %s",
+ value, value.getClass(), fieldName, choiceDataType.getPossibleSubTypes()));
+ }
+
+ return DataTypeUtils.convertType(record.getValue(fieldName), chosenDataType, fieldName);
+ };
+ break;
default:
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
}
if (!isNullable) {
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 1308316e16..759b0bc576 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -38,8 +38,11 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -131,7 +134,8 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
- Types.NestedField.optional(13, "uuid", Types.UUIDType.get())
+ Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+ Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static RecordSchema getStructSchema() {
@@ -188,6 +192,16 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
+ fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private static RecordSchema getChoiceSchema() {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("integer", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("float", RecordFieldType.LONG.getDataType()));
return new SimpleRecordSchema(fields);
}
@@ -233,7 +247,7 @@ public class TestIcebergRecordConverter {
return new MapRecord(getMapSchema(), values);
}
- private static Record setupPrimitivesTestRecord(RecordSchema schema) {
+ private static Record setupPrimitivesTestRecord() {
LocalDate localDate = LocalDate.of(2017, 4, 4);
LocalTime localTime = LocalTime.of(14, 20, 33);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
@@ -254,14 +268,24 @@ public class TestIcebergRecordConverter {
values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
values.put("timestampTz", Timestamp.valueOf(localDateTime));
values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
+ values.put("choice", "10");
- return new MapRecord(schema, values);
+ return new MapRecord(getPrimitivesSchema(), values);
+ }
+
+ private static Record setupChoiceTestRecord() {
+ Map<String, Object> values = new HashMap<>();
+ values.put("choice1", "20");
+ values.put("choice2", "30a");
+ values.put("choice3", String.valueOf(Long.MAX_VALUE));
+
+ return new MapRecord(getChoiceSchema(), values);
}
@Test
public void testPrimitivesAvro() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
- Record record = setupPrimitivesTestRecord(nifiSchema);
+ Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
@@ -290,13 +314,14 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+ Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@DisabledOnOs(WINDOWS)
@Test
public void testPrimitivesOrc() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
- Record record = setupPrimitivesTestRecord(nifiSchema);
+ Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
GenericRecord genericRecord = recordConverter.convert(record);
@@ -325,12 +350,13 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
+ Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
public void testPrimitivesParquet() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
- Record record = setupPrimitivesTestRecord(nifiSchema);
+ Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
@@ -359,6 +385,7 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+ Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
@@ -647,6 +674,34 @@ public class TestIcebergRecordConverter {
assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
}
+ @Test
+ public void testChoiceDataTypeInRecord() {
+ Record record = setupChoiceTestRecord();
+ DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
+
+ RecordFieldGetter.FieldGetter fieldGetter1 = RecordFieldGetter.createFieldGetter(dataType, "choice1", true);
+ RecordFieldGetter.FieldGetter fieldGetter2 = RecordFieldGetter.createFieldGetter(dataType, "choice2", true);
+ RecordFieldGetter.FieldGetter fieldGetter3 = RecordFieldGetter.createFieldGetter(dataType, "choice3", true);
+
+ Assertions.assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record));
+ Assertions.assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record));
+ Assertions.assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record));
+ }
+
+ @Test
+ public void testChoiceDataTypeInArray() {
+ DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
+ ArrayElementGetter.ElementGetter elementGetter = ArrayElementGetter.createElementGetter(dataType);
+
+ String[] testArray = {"20", "30a", String.valueOf(Long.MAX_VALUE)};
+
+ Assertions.assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray, 0));
+ Assertions.assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray, 1));
+ Assertions.assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray, 2));
+ }
+
public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
.schema(schema)