You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/06/15 14:06:13 UTC

[nifi] branch main updated: NIFI-8609: Optimized AvroTypeUtil Record creation and conversion

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e06afbd  NIFI-8609: Optimized AvroTypeUtil Record creation and conversion
e06afbd is described below

commit e06afbdd22139a1744e83b46ce1d6e9c78bbdc36
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon May 17 17:35:50 2021 -0400

    NIFI-8609: Optimized AvroTypeUtil Record creation and conversion
    
    Added unit test that is ignored so that it can be manually run for testing performance before/after changes to AvroTypeUtil. Updated AvroTypeUtil to be more efficient by not using Record.getValue() and instead iterating over the Map of values directly. getValue() is less efficient here because we know the RecordField's we are iterating over exist in the schema since they are retrieved from there directly; as a result, any null values still have be looked up by aliaases, but that step  [...]
    
    This closes #5080
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    | 59 +++++++++++++++-------
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     | 36 +++++++++++++
 2 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2d5ee94..6003e19 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -585,27 +585,37 @@ public class AvroTypeUtil {
         final GenericRecord rec = new GenericData.Record(avroSchema);
         final RecordSchema recordSchema = record.getSchema();
 
-        for (final RecordField recordField : recordSchema.getFields()) {
-            final Object rawValue = record.getValue(recordField);
+        final Map<String, Object> recordValues = record.toMap();
+        for (final Map.Entry<String, Object> entry : recordValues.entrySet()) {
+            final Object rawValue = entry.getValue();
+            if (rawValue == null) {
+                continue;
+            }
 
-            Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
-            final String fieldName = fieldPair.getLeft();
-            final Field field = fieldPair.getRight();
-            if (field == null) {
+            final String rawFieldName = entry.getKey();
+            final Optional<RecordField> optionalRecordField = recordSchema.getField(rawFieldName);
+            if (!optionalRecordField.isPresent()) {
                 continue;
             }
 
-            final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
-            rec.put(field.name(), converted);
-        }
+            final RecordField recordField = optionalRecordField.get();
 
-        // see if the Avro schema has any fields that aren't in the RecordSchema, and if those fields have a default
-        // value then we want to populate it in the GenericRecord being produced
-        for (final Field field : avroSchema.getFields()) {
-            final Optional<RecordField> recordField = recordSchema.getField(field.name());
-            if (!recordField.isPresent() && rec.get(field.name()) == null && field.defaultVal() != null) {
-                rec.put(field.name(), field.defaultVal());
+            final Field field;
+            final Field avroField = avroSchema.getField(rawFieldName);
+            if (avroField == null) {
+                final Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
+                field = fieldPair.getRight();
+
+                if (field == null) {
+                    continue;
+                }
+            } else {
+                field = avroField;
             }
+
+            final String fieldName = field.name();
+            final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
+            rec.put(fieldName, converted);
         }
 
         return rec;
@@ -850,6 +860,10 @@ public class AvroTypeUtil {
                     throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
                 }
             case STRING:
+                if (rawValue instanceof String) {
+                    return rawValue;
+                }
+
                 return DataTypeUtils.toString(rawValue, (String) null, charset);
         }
 
@@ -913,12 +927,19 @@ public class AvroTypeUtil {
         // we will have two possible types, and one of them will be null. When this happens, we can be much more efficient by simply
         // determining the non-null type and converting to that.
         final List<Schema> schemaTypes = fieldSchema.getTypes();
-        if (schemaTypes.size() == 2 && (schemaTypes.get(0).getType() == Type.NULL || schemaTypes.get(1).getType() == Type.NULL)) {
-            final Schema nonNullType = schemaTypes.get(0).getType() == Type.NULL ? schemaTypes.get(1) : schemaTypes.get(0);
-            return conversion.apply(nonNullType);
+        if (schemaTypes.size() == 2) {
+            final Schema firstSchema = schemaTypes.get(0);
+            final Schema secondSchema = schemaTypes.get(1);
+
+            if (firstSchema.getType() == Type.NULL) {
+                return conversion.apply(secondSchema);
+            }
+            if (secondSchema.getType() == Type.NULL) {
+                return conversion.apply(firstSchema);
+            }
         }
 
-        Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
+        final Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
                 originalValue,
                 getNonNullSubSchemas(fieldSchema),
                 AvroTypeUtil::determineDataType
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index ccbfff4..b0e4a23 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -71,6 +72,41 @@ import static org.junit.Assert.fail;
 public class TestAvroTypeUtil {
 
     @Test
+    @Ignore("Performance test meant for manually testing only before/after changes in order to measure performance difference caused by changes.")
+    public void testCreateAvroRecordPerformance() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        for (int i=0; i < 100; i++) {
+            fields.add(new RecordField("field" + i, RecordFieldType.STRING.getDataType(), true));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+
+        final Map<String, Object> values = new HashMap<>();
+        for (int i=0; i < 100; i++) {
+            // Leave half of the values null
+            if (i % 2 == 0) {
+                values.put("field" + i, String.valueOf(i));
+            }
+        }
+
+        final MapRecord record = new MapRecord(recordSchema, values);
+
+        final int iterations = 1_000_000;
+
+        for (int j=0; j < 1_000; j++) {
+            final long start = System.currentTimeMillis();
+
+            for (int i = 0; i < iterations; i++) {
+                AvroTypeUtil.createAvroRecord(record, avroSchema);
+            }
+
+            final long millis = System.currentTimeMillis() - start;
+            System.out.println(millis);
+        }
+    }
+
+    @Test
     public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));