You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by markap14 <gi...@git.apache.org> on 2018/08/14 18:14:27 UTC

[GitHub] nifi pull request #2833: NIFI-5353: Add JoltTransformRecord processor

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2833#discussion_r210043658
  
    --- Diff: nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.jolt.record;
    +
    +import com.bazaarvoice.jolt.ContextualTransform;
    +import com.bazaarvoice.jolt.JoltTransform;
    +import com.bazaarvoice.jolt.JsonUtils;
    +import com.bazaarvoice.jolt.Transform;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.SystemResource;
    +import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.jolt.record.util.TransformFactory;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.ListRecordSet;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
    +
    +import java.io.FilenameFilter;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"),
    +        @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"),
    +})
    +@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created "
    +        + "with transformed content and is routed to the 'success' relationship. If the transform "
    +        + "fails, the original FlowFile is routed to the 'failure' relationship.")
    +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "If the Jolt transform is applied to the entire record set, memory issues can occur "
    +        + "for large record sets.")
    +public class JoltTransformRecord extends AbstractProcessor {
    +
    +    static final AllowableValue SHIFTR
    +            = new AllowableValue("jolt-transform-shift", "Shift", "Shift input data to create the output.");
    +    static final AllowableValue CHAINR
    +            = new AllowableValue("jolt-transform-chain", "Chain", "Execute list of Jolt transformations.");
    +    static final AllowableValue DEFAULTR
    +            = new AllowableValue("jolt-transform-default", "Default", " Apply default values to the output.");
    +    static final AllowableValue REMOVR
    +            = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output.");
    +    static final AllowableValue CARDINALITY
    +            = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output.");
    +    static final AllowableValue SORTR
    +            = new AllowableValue("jolt-transform-sort", "Sort", "Sort input field name values alphabetically. Any specification set is ignored.");
    +    static final AllowableValue CUSTOMR
    +            = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name");
    +    static final AllowableValue MODIFIER_DEFAULTR
    +            = new AllowableValue("jolt-transform-modify-default", "Modify - Default", "Writes when field name is missing or value is null");
    +    static final AllowableValue MODIFIER_OVERWRITER
    +            = new AllowableValue("jolt-transform-modify-overwrite", "Modify - Overwrite", " Always overwrite value");
    +    static final AllowableValue MODIFIER_DEFINER
    +            = new AllowableValue("jolt-transform-modify-define", "Modify - Define", "Writes when key is missing");
    +
    +    static final AllowableValue APPLY_TO_RECORD_SET
    +            = new AllowableValue("jolt-record-apply-recordset", "Entire Record Set", "Applies the transformation to the record set as a whole. Used when "
    +            + "values from multiple records are needed in the transformation.");
    +    static final AllowableValue APPLY_TO_RECORDS
    +            = new AllowableValue("jolt-record-apply-records", "Each Record", "Applies the transformation to each record individually.");
    +
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +            .name("jolt-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
    +            .name("jolt-record-record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
    +            .name("jolt-record-transform")
    +            .displayName("Jolt Transformation DSL")
    +            .description("Specifies the Jolt Transformation that should be used with the provided specification.")
    +            .required(true)
    +            .allowableValues(CARDINALITY, CHAINR, DEFAULTR, MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR, SORTR, CUSTOMR)
    +            .defaultValue(CHAINR.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
    +            .name("jolt-record-spec")
    +            .displayName("Jolt Specification")
    +            .description("Jolt Specification for transform of record data. This value is ignored if the Jolt Sort Transformation is selected.")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .build();
    +
    +    static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder()
    +            .name("jolt-record-custom-class")
    +            .displayName("Custom Transformation Class Name")
    +            .description("Fully Qualified Class Name for Custom Transformation")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
    +            .name("jolt-record-custom-modules")
    +            .displayName("Custom Module Directory")
    +            .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TRANSFORM_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("jolt-record-transform-strategy")
    +            .displayName("Transformation Strategy")
    +            .description("Specifies whether the transform should be applied to the entire record set or to each individual record. Note that when the transform is applied to "
    +                    + "the entire record set, the first element in the spec should be an asterix (*) in order to match each record.")
    +            .required(true)
    +            .allowableValues(APPLY_TO_RECORD_SET, APPLY_TO_RECORDS)
    +            .defaultValue(APPLY_TO_RECORDS.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
    +            .name("jolt-record-transform-cache-size")
    +            .displayName("Transform Cache Size")
    +            .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
    +                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .required(true)
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The FlowFile with transformed content will be routed to this relationship")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> properties;
    +    private final static Set<Relationship> relationships;
    +    private volatile ClassLoader customClassLoader;
    +    private final static String DEFAULT_CHARSET = "UTF-8";
    +
    +    // Cache is guarded by synchronizing on 'this'.
    +    private volatile int maxTransformsToCache = 10;
    +    private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() {
    +        @Override
    +        protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) {
    +            final boolean evict = size() > maxTransformsToCache;
    +            if (evict) {
    +                getLogger().debug("Removing Jolt Transform from cache because cache is full");
    +            }
    +            return evict;
    +        }
    +    };
    +
    +    static {
    +        final List<PropertyDescriptor> _properties = new ArrayList<>();
    +        _properties.add(RECORD_READER);
    +        _properties.add(RECORD_WRITER);
    +        _properties.add(JOLT_TRANSFORM);
    +        _properties.add(CUSTOM_CLASS);
    +        _properties.add(MODULES);
    +        _properties.add(JOLT_SPEC);
    +        _properties.add(TRANSFORM_STRATEGY);
    +        _properties.add(TRANSFORM_CACHE_SIZE);
    +        properties = Collections.unmodifiableList(_properties);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
    +        final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
    +        final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
    +        final String modulePath = validationContext.getProperty(MODULES).isSet() ? validationContext.getProperty(MODULES).getValue() : null;
    +
    +        if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
    +            if (!SORTR.getValue().equals(transform)) {
    +                final String message = "A specification is required for this transformation";
    +                results.add(new ValidationResult.Builder().valid(false)
    +                        .explanation(message)
    +                        .build());
    +            }
    +        } else {
    +            final ClassLoader customClassLoader;
    +
    +            try {
    +                if (modulePath != null) {
    +                    customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
    +                } else {
    +                    customClassLoader = this.getClass().getClassLoader();
    +                }
    +
    +                final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
    +
    +                if (validationContext.isExpressionLanguagePresent(specValue)) {
    +                    final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
    +                    if (!StringUtils.isEmpty(invalidExpressionMsg)) {
    +                        results.add(new ValidationResult.Builder().valid(false)
    +                                .subject(JOLT_SPEC.getDisplayName())
    +                                .explanation("Invalid Expression Language: " + invalidExpressionMsg)
    +                                .build());
    +                    }
    +                } else {
    +                    //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
    +                    Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
    +
    +                    if (CUSTOMR.getValue().equals(transform)) {
    +                        if (StringUtils.isEmpty(customTransform)) {
    +                            final String customMessage = "A custom transformation class should be provided. ";
    +                            results.add(new ValidationResult.Builder().valid(false)
    +                                    .explanation(customMessage)
    +                                    .build());
    +                        } else {
    +                            TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
    +                        }
    +                    } else {
    +                        TransformFactory.getTransform(customClassLoader, transform, specJson);
    +                    }
    +                }
    +            } catch (final Exception e) {
    +                getLogger().info("Processor is not valid - " + e.toString());
    +                String message = "Specification not valid for the selected transformation.";
    +                results.add(new ValidationResult.Builder().valid(false)
    +                        .explanation(message)
    +                        .build());
    +            }
    +        }
    +
    +        return results;
    +    }
    +
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
    +        final FlowFile original = session.get();
    +        if (original == null) {
    +            return;
    +        }
    +
    +        final ComponentLog logger = getLogger();
    +        final StopWatch stopWatch = new StopWatch(true);
    +
    +        final String transformStrategy = context.getProperty(TRANSFORM_STRATEGY).getValue();
    +        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +
    +        final RecordSchema schema;
    +        final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
    +        try (final InputStream in = session.read(original);
    +             final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
    +            schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema());
    +            Record record;
    +
    +            FlowFile transformed = session.create(original);
    +            final Map<String, String> attributes = new HashMap<>();
    +            final WriteResult writeResult;
    +            try (final OutputStream out = session.write(transformed);
    +                 final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
    +
    +                final JoltTransform transform = getTransform(context, original);
    +                if (customClassLoader != null) {
    +                    Thread.currentThread().setContextClassLoader(customClassLoader);
    +                }
    +
    +                if (APPLY_TO_RECORD_SET.getValue().equals(transformStrategy)) {
    +                    List<Map<String, Object>> recordList = new ArrayList<>();
    +                    final RecordSet transformedRecordSet;
    +                    while ((record = reader.nextRecord()) != null) {
    +
    +                        Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
    +                        // JOLT expects arrays to be of type List where our Record code uses Object[].
    +                        // Make another pass of the transformed objects to change Object[] to List.
    +                        recordMap = (Map<String, Object>) normalizeJoltObjects(recordMap);
    +                        recordList.add(recordMap);
    +                    }
    +                    Object transformedObjects = transform(transform, recordList);
    +
    +                    // JOLT expects arrays to be of type List where our Record code uses Object[].
    +                    // Make another pass of the transformed objects to change List to Object[].
    +                    if (transformedObjects instanceof Map) {
    +                        // The set of incoming records has been transformed to a single record (Map)
    +                        List<Record> normalizedList = new ArrayList<>();
    +                        normalizedList.add(DataTypeUtils.toRecord(normalizeRecordObjects(transformedObjects), schema, "r"));
    +                        transformedObjects = normalizedList;
    +
    +                    } else if (transformedObjects instanceof List) {
    +                        transformedObjects = ((List) transformedObjects).stream()
    +                                .map(JoltTransformRecord::normalizeRecordObjects)
    +                                .map((o) -> DataTypeUtils.toRecord(o, schema, "r"))
    +                                .collect(Collectors.toList());
    +                    }
    +
    +                    transformedRecordSet = new ListRecordSet(schema, (List<Record>) transformedObjects);
    +                    writeResult = writer.write(transformedRecordSet);
    +
    +                } else {
    +                    writer.beginRecordSet();
    +                    while ((record = reader.nextRecord()) != null) {
    +                        Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
    +                        // JOLT expects arrays to be of type List where our Record code uses Object[].
    +                        // Make another pass of the transformed objects to change Object[] to List.
    +                        recordMap = (Map<String, Object>) normalizeJoltObjects(recordMap);
    +                        Object transformedObject = transform(transform, recordMap);
    +                        // JOLT expects arrays to be of type List where our Record code uses Object[].
    +                        // Make another pass of the transformed objects to change List to Object[].
    +                        Record r = DataTypeUtils.toRecord(normalizeRecordObjects(transformedObject), schema, "r");
    +                        writer.write(r);
    +                    }
    +                    writeResult = writer.finishRecordSet();
    +                }
    +
    +                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
    +                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
    +                attributes.putAll(writeResult.getAttributes());
    +            } catch (Exception e) {
    +                logger.error("Unable to write transformed records {} due to {}", new Object[]{original, e.toString(), e});
    +                session.remove(transformed);
    +                session.transfer(original, REL_FAILURE);
    +                return;
    +            }
    +
    +            final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
    +            transformed = session.putAllAttributes(transformed, attributes);
    +            session.transfer(transformed, REL_SUCCESS);
    +            session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +            session.remove(original);
    --- End diff --
    
    It's a bit odd to fork a child and then remove the original. Should probably either transfer the original to an 'original' relationship or just modify the original FlowFile itself, rather than forking a child.


---