You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2019/02/14 13:32:21 UTC
[nifi] branch master updated: NIFI-5943 support conversions from
List to Avro ARRAY and from Map to Avro RECORD NIFI-5943 Added another unit
test to verify list + map conversion to list of records. (Mike Thomsen)
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e7ae977 NIFI-5943 support conversions from List to Avro ARRAY and from Map to Avro RECORD NIFI-5943 Added another unit test to verify list + map conversion to list of records. (Mike Thomsen)
e7ae977 is described below
commit e7ae97797efe591e4c2e46a70f3e9c834072c8e9
Author: Alex Savitsky <al...@scotiabank.com>
AuthorDate: Mon Jan 14 08:49:13 2019 -0500
NIFI-5943 support conversions from List to Avro ARRAY and from Map to Avro RECORD
NIFI-5943 Added another unit test to verify list + map conversion to list of records. (Mike Thomsen)
This closes #3267
Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 28 ++++++--
.../org/apache/nifi/avro/TestAvroTypeUtil.java | 76 ++++++++++++++++++----
2 files changed, 85 insertions(+), 19 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 043f7ab..4b13226 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -59,6 +59,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -754,10 +755,20 @@ public class AvroTypeUtil {
case RECORD:
final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
- final Record record = (Record) rawValue;
- for (final RecordField recordField : record.getSchema().getFields()) {
- final Object recordFieldValue = record.getValue(recordField);
- final String recordFieldName = recordField.getFieldName();
+ final Set<Map.Entry<String, Object>> entries;
+ if (rawValue instanceof Map) {
+ final Map<String, Object> map = (Map<String, Object>) rawValue;
+ entries = map.entrySet();
+ } else if (rawValue instanceof Record) {
+ entries = new HashSet<>();
+ final Record record = (Record) rawValue;
+ record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field))));
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
+ }
+ for (final Map.Entry<String, Object> e : entries) {
+ final Object recordFieldValue = e.getValue();
+ final String recordFieldName = e.getKey();
final Field field = fieldSchema.getField(recordFieldName);
if (field == null) {
@@ -771,7 +782,14 @@ public class AvroTypeUtil {
case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
- final Object[] objectArray = (Object[]) rawValue;
+ final Object[] objectArray;
+ if (rawValue instanceof List) {
+ objectArray = ((List) rawValue).toArray();
+ } else if (rawValue instanceof Object[]) {
+ objectArray = (Object[]) rawValue;
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
+ }
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 004999f..3431da6 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -45,11 +45,13 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -455,23 +457,23 @@ public class TestAvroTypeUtil {
@Test
public void testAliasCreatedForInvalidField() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("valid", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("$invalid2", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("3invalid3", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField(" __ Another ONE!!", RecordFieldType.STRING.getDataType()));
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("valid", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("$invalid2", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("3invalid3", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField(" __ Another ONE!!", RecordFieldType.STRING.getDataType()));
- final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
- final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
- assertNotNull(avroSchema.getField("valid"));
+ final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+ assertNotNull(avroSchema.getField("valid"));
- assertNull(avroSchema.getField("$invalid"));
- final Field field2 = avroSchema.getField("_invalid2");
- assertNotNull(field2);
- assertEquals("_invalid2", field2.name());
- assertEquals(1, field2.aliases().size());
- assertTrue(field2.aliases().contains("$invalid2"));
+ assertNull(avroSchema.getField("$invalid"));
+ final Field field2 = avroSchema.getField("_invalid2");
+ assertNotNull(field2);
+ assertEquals("_invalid2", field2.name());
+ assertEquals(1, field2.aliases().size());
+ assertTrue(field2.aliases().contains("$invalid2"));
assertNull(avroSchema.getField("$invalid3"));
final Field field3 = avroSchema.getField("_invalid3");
@@ -486,6 +488,52 @@ public class TestAvroTypeUtil {
assertEquals("_____Another_ONE__", field4.name());
assertEquals(1, field4.aliases().size());
assertTrue(field4.aliases().contains(" __ Another ONE!!"));
+ }
+
+ public void testListToArrayConversion() {
+ final Charset charset = Charset.forName("UTF-8");
+ Object o = AvroTypeUtil.convertToAvroObject(Collections.singletonList("Hello"), Schema.createArray(Schema.create(Type.STRING)), charset);
+ assertTrue(o instanceof List);
+ assertEquals(1, ((List) o).size());
+ assertEquals("Hello", ((List) o).get(0));
+ }
+ @Test
+ public void testMapToRecordConversion() {
+ final Charset charset = Charset.forName("UTF-8");
+ Object o = AvroTypeUtil.convertToAvroObject(Collections.singletonMap("Hello", "World"),
+ Schema.createRecord(Collections.singletonList(new Field("Hello", Schema.create(Type.STRING), "", ""))), charset);
+ assertTrue(o instanceof Record);
+ assertEquals("World", ((Record) o).get("Hello"));
+ }
+
+ @Test
+ public void testListAndMapConversion() {
+ Schema s = Schema.createRecord(Arrays.asList(
+ new Field("List", Schema.createArray(Schema.createRecord(
+ Arrays.asList(
+ new Field("Message", Schema.create(Type.STRING), "", "")
+ )
+ )), "", null)
+ ));
+
+ Map<String, Object> obj = new HashMap<>();
+ List<Map<String, Object>> list = new ArrayList<>();
+ for (int x = 0; x < 10; x++) {
+ list.add(new HashMap<String, Object>(){{
+ put("Message", UUID.randomUUID().toString());
+ }});
+ }
+ obj.put("List", list);
+
+ Object o = AvroTypeUtil.convertToAvroObject(obj, s);
+ assertTrue(o instanceof Record);
+ List innerList = (List)((Record)o).get("List");
+ assertNotNull( innerList );
+ assertEquals(10, innerList.size());
+ for (Object inner : innerList) {
+ assertTrue(inner instanceof Record);
+ assertNotNull(((Record)inner).get("Message"));
+ }
}
}