You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/17 14:53:55 UTC

nifi git commit: NIFI-3919: Let AvroTypeUtil try every possible type

Repository: nifi
Updated Branches:
  refs/heads/master f019d509f -> 40a9cd4f2


NIFI-3919: Let AvroTypeUtil try every possible type

Before this fix, AvroTypeUtil can throw an Exception before trying every possible data types defined within a union field.

This closes #1816.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40a9cd4f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40a9cd4f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40a9cd4f

Branch: refs/heads/master
Commit: 40a9cd4f2ead4d0388581921b0de9108a6e97946
Parents: f019d50
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed May 17 15:34:01 2017 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed May 17 10:53:27 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 20 ++++++---
 .../nifi-record-serialization-services/pom.xml  |  1 +
 .../avro/TestAvroReaderWithEmbeddedSchema.java  | 43 ++++++++++++++++++++
 .../src/test/resources/avro/multiple-types.avsc | 16 ++++++++
 4 files changed, 75 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 6494fe5..daf4031 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -39,6 +39,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -58,6 +60,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class AvroTypeUtil {
+    private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
     public static final String AVRO_SCHEMA_FORMAT = "avro";
 
     private static final String LOGICAL_TYPE_DATE = "date";
@@ -459,12 +462,19 @@ public class AvroTypeUtil {
         // If at least one non-null type exists, find the first compatible type
         if (nonNullFieldSchemas.size() >= 1) {
             for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
-                final Object convertedValue = conversion.apply(nonNullFieldSchema);
                 final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
-                if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
-                        // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
-                        || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
-                    return convertedValue;
+                try {
+                    final Object convertedValue = conversion.apply(nonNullFieldSchema);
+                    if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
+                            // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
+                            || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
+                        return convertedValue;
+                    }
+                } catch (Exception e) {
+                    // If failed with one of possible types, continue with the next available option.
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 5dc1160..388d52f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -95,6 +95,7 @@
                         <exclude>src/test/resources/avro/datatypes.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
+                        <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
                         <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/single-bank-account.csv</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index 6782d33..5c04cfa 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -290,6 +290,49 @@ public class TestAvroReaderWithEmbeddedSchema {
         }
     }
 
+    @Test
+    public void testMultipleTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/multiple-types.avsc"));
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] serialized;
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+             final DataFileWriter<GenericRecord> writer = dataFileWriter.create(schema, baos)) {
+
+            // If a union field has multiple type options, a value should be mapped to the first compatible type.
+            final GenericRecord r1 = new GenericData.Record(schema);
+            r1.put("field", 123);
+
+            final GenericRecord r2 = new GenericData.Record(schema);
+            r2.put("field", Arrays.asList(1, 2, 3));
+
+            final GenericRecord r3 = new GenericData.Record(schema);
+            r3.put("field", "not a number");
+
+            writer.append(r1);
+            writer.append(r2);
+            writer.append(r3);
+            writer.flush();
+
+            serialized = baos.toByteArray();
+        }
+
+        try (final InputStream in = new ByteArrayInputStream(serialized)) {
+            final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in);
+            final RecordSchema recordSchema = reader.getSchema();
+
+            assertEquals(RecordFieldType.CHOICE, recordSchema.getDataType("field").get().getFieldType());
+
+            Record record = reader.nextRecord();
+            assertEquals(123, record.getValue("field"));
+            record = reader.nextRecord();
+            assertArrayEquals(new Object[]{1, 2, 3}, (Object[]) record.getValue("field"));
+            record = reader.nextRecord();
+            assertEquals("not a number", record.getValue("field"));
+        }
+    }
+
     private Object[] toObjectArray(final byte[] bytes) {
         final Object[] array = new Object[bytes.length];
         for (int i = 0; i < bytes.length; i++) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
new file mode 100644
index 0000000..670555e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
@@ -0,0 +1,16 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "field",
+      "type": [
+        "null",
+        "int",
+        {"type": "array", "items": "int"},
+        "string"
+      ]
+    }
+  ]
+}
\ No newline at end of file