You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/06/07 16:04:12 UTC

nifi git commit: NIFI-4029: Allow null Avro default values in HortonworksSchemaRegistry

Repository: nifi
Updated Branches:
  refs/heads/master c86190c51 -> 45e035686


NIFI-4029: Allow null Avro default values in HortonworksSchemaRegistry

This closes #1894.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45e03568
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45e03568
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45e03568

Branch: refs/heads/master
Commit: 45e035686f0a51c29629f6f6dfa1d26496e21997
Parents: c86190c
Author: Steve Champagne <ch...@gmail.com>
Authored: Wed Jun 7 12:36:28 2017 +0000
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Jun 7 12:03:53 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |   6 +-
 .../nifi-hwx-schema-registry-service/pom.xml    |   4 +
 .../hortonworks/HortonworksSchemaRegistry.java  | 122 +------------------
 3 files changed, 13 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 52c55fc..1417e67 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -152,7 +152,11 @@ public class AvroTypeUtil {
                     final Schema fieldSchema = field.schema();
                     final DataType fieldType = determineDataType(fieldSchema);
 
-                    recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
+                    if (field.defaultVal() == JsonProperties.NULL_VALUE) {
+                        recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
+                    } else {
+                        recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
+                    }
                 }
 
                 final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);

http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index 79dbc84..38e175c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -43,6 +43,10 @@ limitations under the License.
 		</dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-schema-registry-service-api</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index d2289a2..3027e5f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -25,12 +25,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -42,10 +39,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.util.Tuple;
@@ -261,7 +254,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
         return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
             final Schema schema = new Schema.Parser().parse(schemaText);
-            return createRecordSchema(schema, schemaText, schemaIdentifier);
+            return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
         });
     }
 
@@ -309,120 +302,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
         return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
             final Schema schema = new Schema.Parser().parse(schemaText);
-            return createRecordSchema(schema, schemaText, schemaIdentifier);
+            return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
         });
     }
 
 
-    /**
-     * Converts an Avro Schema to a RecordSchema
-     *
-     * @param avroSchema the Avro Schema to convert
-     * @param text the textual representation of the schema
-     * @param schemaId the id of the schema
-     * @return the Corresponding Record Schema
-     */
-    private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) {
-        final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
-        for (final Field field : avroSchema.getFields()) {
-            final String fieldName = field.name();
-            final DataType dataType = determineDataType(field.schema());
-
-            recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
-        }
-
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId);
-        return recordSchema;
-    }
-
-    /**
-     * Returns a DataType for the given Avro Schema
-     *
-     * @param avroSchema the Avro Schema to convert
-     * @return a Data Type that corresponds to the given Avro Schema
-     */
-    private DataType determineDataType(final Schema avroSchema) {
-        final Type avroType = avroSchema.getType();
-
-        final LogicalType logicalType = avroSchema.getLogicalType();
-        if (logicalType != null) {
-            final String logicalTypeName = logicalType.getName();
-            switch (logicalTypeName) {
-                case LOGICAL_TYPE_DATE:
-                    return RecordFieldType.DATE.getDataType();
-                case LOGICAL_TYPE_TIME_MILLIS:
-                case LOGICAL_TYPE_TIME_MICROS:
-                    return RecordFieldType.TIME.getDataType();
-                case LOGICAL_TYPE_TIMESTAMP_MILLIS:
-                case LOGICAL_TYPE_TIMESTAMP_MICROS:
-                    return RecordFieldType.TIMESTAMP.getDataType();
-            }
-        }
-
-        switch (avroType) {
-            case ARRAY:
-                return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
-            case BYTES:
-            case FIXED:
-                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case BOOLEAN:
-                return RecordFieldType.BOOLEAN.getDataType();
-            case DOUBLE:
-                return RecordFieldType.DOUBLE.getDataType();
-            case ENUM:
-            case STRING:
-                return RecordFieldType.STRING.getDataType();
-            case FLOAT:
-                return RecordFieldType.FLOAT.getDataType();
-            case INT:
-                return RecordFieldType.INT.getDataType();
-            case LONG:
-                return RecordFieldType.LONG.getDataType();
-            case RECORD: {
-                final List<Field> avroFields = avroSchema.getFields();
-                final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
-
-                for (final Field field : avroFields) {
-                    final String fieldName = field.name();
-                    final Schema fieldSchema = field.schema();
-                    final DataType fieldType = determineDataType(fieldSchema);
-                    recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
-                }
-
-                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY);
-                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
-            }
-            case NULL:
-                return RecordFieldType.STRING.getDataType();
-            case MAP:
-                final Schema valueSchema = avroSchema.getValueType();
-                final DataType valueType = determineDataType(valueSchema);
-                return RecordFieldType.MAP.getMapDataType(valueType);
-            case UNION: {
-                final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
-                    .filter(s -> s.getType() != Type.NULL)
-                    .collect(Collectors.toList());
-
-                if (nonNullSubSchemas.size() == 1) {
-                    return determineDataType(nonNullSubSchemas.get(0));
-                }
-
-                final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
-                for (final Schema subSchema : nonNullSubSchemas) {
-                    final DataType childDataType = determineDataType(subSchema);
-                    possibleChildTypes.add(childDataType);
-                }
-
-                return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
-            }
-        }
-
-        return null;
-    }
-
-
     @Override
     public Set<SchemaField> getSuppliedSchemaFields() {
         return schemaFields;
     }
-}
\ No newline at end of file
+}