You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/17 14:53:55 UTC
nifi git commit: NIFI-3919: Let AvroTypeUtil try every possible type
Repository: nifi
Updated Branches:
refs/heads/master f019d509f -> 40a9cd4f2
NIFI-3919: Let AvroTypeUtil try every possible type
Before this fix, AvroTypeUtil can throw an Exception before trying every possible data types defined within a union field.
This closes #1816.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40a9cd4f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40a9cd4f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40a9cd4f
Branch: refs/heads/master
Commit: 40a9cd4f2ead4d0388581921b0de9108a6e97946
Parents: f019d50
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed May 17 15:34:01 2017 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed May 17 10:53:27 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 20 ++++++---
.../nifi-record-serialization-services/pom.xml | 1 +
.../avro/TestAvroReaderWithEmbeddedSchema.java | 43 ++++++++++++++++++++
.../src/test/resources/avro/multiple-types.avsc | 16 ++++++++
4 files changed, 75 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 6494fe5..daf4031 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -39,6 +39,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
@@ -58,6 +60,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
public class AvroTypeUtil {
+ private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
public static final String AVRO_SCHEMA_FORMAT = "avro";
private static final String LOGICAL_TYPE_DATE = "date";
@@ -459,12 +462,19 @@ public class AvroTypeUtil {
// If at least one non-null type exists, find the first compatible type
if (nonNullFieldSchemas.size() >= 1) {
for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
- final Object convertedValue = conversion.apply(nonNullFieldSchema);
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
- if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
- // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
- || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
- return convertedValue;
+ try {
+ final Object convertedValue = conversion.apply(nonNullFieldSchema);
+ if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
+ // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
+ || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
+ return convertedValue;
+ }
+ } catch (Exception e) {
+ // If failed with one of possible types, continue with the next available option.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 5dc1160..388d52f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -95,6 +95,7 @@
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
+ <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index 6782d33..5c04cfa 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -290,6 +290,49 @@ public class TestAvroReaderWithEmbeddedSchema {
}
}
+ @Test
+ public void testMultipleTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/multiple-types.avsc"));
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final byte[] serialized;
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+ final DataFileWriter<GenericRecord> writer = dataFileWriter.create(schema, baos)) {
+
+ // If a union field has multiple type options, a value should be mapped to the first compatible type.
+ final GenericRecord r1 = new GenericData.Record(schema);
+ r1.put("field", 123);
+
+ final GenericRecord r2 = new GenericData.Record(schema);
+ r2.put("field", Arrays.asList(1, 2, 3));
+
+ final GenericRecord r3 = new GenericData.Record(schema);
+ r3.put("field", "not a number");
+
+ writer.append(r1);
+ writer.append(r2);
+ writer.append(r3);
+ writer.flush();
+
+ serialized = baos.toByteArray();
+ }
+
+ try (final InputStream in = new ByteArrayInputStream(serialized)) {
+ final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in);
+ final RecordSchema recordSchema = reader.getSchema();
+
+ assertEquals(RecordFieldType.CHOICE, recordSchema.getDataType("field").get().getFieldType());
+
+ Record record = reader.nextRecord();
+ assertEquals(123, record.getValue("field"));
+ record = reader.nextRecord();
+ assertArrayEquals(new Object[]{1, 2, 3}, (Object[]) record.getValue("field"));
+ record = reader.nextRecord();
+ assertEquals("not a number", record.getValue("field"));
+ }
+ }
+
private Object[] toObjectArray(final byte[] bytes) {
final Object[] array = new Object[bytes.length];
for (int i = 0; i < bytes.length; i++) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
new file mode 100644
index 0000000..670555e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc
@@ -0,0 +1,16 @@
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name": "field",
+ "type": [
+ "null",
+ "int",
+ {"type": "array", "items": "int"},
+ "string"
+ ]
+ }
+ ]
+}
\ No newline at end of file