You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/08 22:50:52 UTC

[GitHub] [nifi] mattyb149 opened a new pull request, #4797: NIFI-8111: Add JSLTTransformJSON processor

mattyb149 opened a new pull request, #4797:
URL: https://github.com/apache/nifi/pull/4797

   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   Provides an alternative to JoltTransformJSON using [JSLT](https://github.com/schibsted/jslt) as the transformation language. The JOLT DSL can have a steep learning curve, where JSLT is inspired by other popular tools such as `jq`, `XPath` and `XQuery`. The resulting NAR is < 2MB and only has Jackson as a dependency so I included it in the assembly.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [x] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [x] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r973422286


##########
nifi-assembly/pom.xml:
##########
@@ -862,6 +862,12 @@ language governing permissions and limitations under the License. -->
             <version>1.18.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-jslt-nar</artifactId>
+            <version>1.16.0-SNAPSHOT</version>

Review Comment:
   Looks like this needs to be updated to `1.18.0-SNAPSHOT`.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the flowfile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", new Object[]{original}, e);

Review Comment:
   The wrapping Object[] should be removed.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven

Review Comment:
   The EventDriven annotation should be removed.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the flowfile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", new Object[]{original}, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.info("JSLT transform resulted in no data!");

Review Comment:
   Log message should not include an exclamation mark. It is probably better to remove this line. Should this throw an exception, or at least log a warning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #4797: NIFI-8111: Add JSLTTransformJSON processor
URL: https://github.com/apache/nifi/pull/4797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r990452350


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        logger.debug("Transformed {}", original);
+    }
+
+    @OnStopped
+    @OnShutdown
+    public void onStopped(ProcessContext context) {
+        transformCache.cleanUp();
+    }
+
+    /**
+     * Takes an InputStream and returns the top-level JsonNode returned for the parsed JSON object
+     *
+     * @param in the InputStream from which to read the input
+     * @return the top-level JsonNode for the parsed JSON object
+     * @throws IOException if an error occurs while parsing JSON from the InputStream
+     */
+    private JsonNode readJson(final InputStream in) throws IOException {
+        JsonNode firstJsonNode;
+        try {
+            JsonParser jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            JsonToken token = jsonParser.nextToken();
+            if (token == JsonToken.START_ARRAY) {
+                token = jsonParser.nextToken(); // advance to START_OBJECT token
+            }

Review Comment:
   Great catch! I want it to support array root elements like JoltTransformJSON does. One difference between JoltTransformJSON and JoltTransformRecord is that for the former you write the transform for the entire input (so if you want to iterate over records you need to put that `*` in the spec, and for the latter you write the transform for each object (record). I want to have the same behavior for JSLTTransformJSON, and someday would like to add a JSLTTransformRecord in the same vein. Will update this (and the above comment) to apply the transform to the entire input and add a unit test with a top-level array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on PR #4797:
URL: https://github.com/apache/nifi/pull/4797#issuecomment-1241334742

   Had to force-push this because of the irrelevant file changes plus I rebased against the latest main. Hopefully this is good to go after a final review @Lehel44 @MikeThomsen @exceptionfactory  Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] github-actions[bot] commented on pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #4797:
URL: https://github.com/apache/nifi/pull/4797#issuecomment-1176879260

   We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable.  If you would like this PR re-opened you can do so and a committer can remove the stale tag.  Or you can open a new PR.  Try to help review other PRs to increase PR review bandwidth which in turn helps yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r990320485


##########
nifi-assembly/pom.xml:
##########
@@ -862,6 +862,12 @@ language governing permissions and limitations under the License. -->
             <version>1.18.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-jslt-nar</artifactId>
+            <version>1.16.0-SNAPSHOT</version>

Review Comment:
   Will update to 1.19.0-SNAPSHOT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] github-actions[bot] closed pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #4797: NIFI-8111: Add JSLTTransformJSON processor
URL: https://github.com/apache/nifi/pull/4797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r997380391


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final ObjectMapper jsonObjectMapper = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        try {
+            final String transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
+            final Object outputObject;
+            if (transformedJson == null || transformedJson.isNull()) {
+                logger.warn("JSLT transform resulted in no data");
+                outputObject = null;
+            } else {
+                outputObject = transformedJson;
+            }
+            FlowFile transformed = session.write(original, out -> {
+                if (outputObject != null) {
+                    writer.writeValue(out, outputObject);
+                }
+            });
+            transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+            session.transfer(transformed, REL_SUCCESS);
+            session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));

Review Comment:
   Thanks for the explanation @mattyb149!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r991413387


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final ObjectMapper jsonObjectMapper = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        try {
+            final String transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
+            final Object outputObject;
+            if (transformedJson == null || transformedJson.isNull()) {
+                logger.warn("JSLT transform resulted in no data");
+                outputObject = null;
+            } else {
+                outputObject = transformedJson;
+            }
+            FlowFile transformed = session.write(original, out -> {
+                if (outputObject != null) {
+                    writer.writeValue(out, outputObject);
+                }
+            });
+            transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+            session.transfer(transformed, REL_SUCCESS);
+            session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));

Review Comment:
   Is this manual call to Provenance Reporter still necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r997309166


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final ObjectMapper jsonObjectMapper = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        try {
+            final String transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            final ObjectWriter writer = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonObjectMapper.writerWithDefaultPrettyPrinter() : jsonObjectMapper.writer();
+            final Object outputObject;
+            if (transformedJson == null || transformedJson.isNull()) {
+                logger.warn("JSLT transform resulted in no data");
+                outputObject = null;
+            } else {
+                outputObject = transformedJson;
+            }
+            FlowFile transformed = session.write(original, out -> {
+                if (outputObject != null) {
+                    writer.writeValue(out, outputObject);
+                }
+            });
+            transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+            session.transfer(transformed, REL_SUCCESS);
+            session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));

Review Comment:
   There are a number of processors that do their own modifyContent in order to specify a particular message with more context. This one I copied from JoltTransformJSON, so I kept it for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r990364570


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.schibsted.spt.data</groupId>
+            <artifactId>jslt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.8.1</version>

Review Comment:
   This version could be upgrade to that latest version 2, which is 2.9.3. Version 3 requires Java 11, so 2.9.3 seems to be the best option for now.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);

Review Comment:
   This should be changed to avoid repeating the exception message, and the `Object[]` wrapper should be removed.
   ```suggestion
               logger.error("JSLT Transform failed {}", original, ex);
   ```



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.schibsted.spt.data</groupId>
+            <artifactId>jslt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   This dependency can be removed.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        logger.debug("Transformed {}", original);
+    }
+
+    @OnStopped
+    @OnShutdown
+    public void onStopped(ProcessContext context) {
+        transformCache.cleanUp();
+    }
+
+    /**
+     * Takes an InputStream and returns the top-level JsonNode returned for the parsed JSON object
+     *
+     * @param in the InputStream from which to read the input
+     * @return the top-level JsonNode for the parsed JSON object
+     * @throws IOException if an error occurs while parsing JSON from the InputStream
+     */
+    private JsonNode readJson(final InputStream in) throws IOException {
+        JsonNode firstJsonNode;
+        try {
+            JsonParser jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            JsonToken token = jsonParser.nextToken();
+            if (token == JsonToken.START_ARRAY) {
+                token = jsonParser.nextToken(); // advance to START_OBJECT token
+            }

Review Comment:
   This approach only reads the first element in an array. It seems like this could be confusing behavior, should the Processor require all incoming JSON to be an Object at the root level? If JSLT does not support transforming array root elements, then it seems better to enforce a standard. It may be worth noting this detail in the description one way or the other.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        logger.debug("Transformed {}", original);
+    }
+
+    @OnStopped
+    @OnShutdown
+    public void onStopped(ProcessContext context) {
+        transformCache.cleanUp();
+    }
+
+    /**
+     * Takes an InputStream and returns the top-level JsonNode returned for the parsed JSON object
+     *
+     * @param in the InputStream from which to read the input
+     * @return the top-level JsonNode for the parsed JSON object
+     * @throws IOException if an error occurs while parsing JSON from the InputStream
+     */
+    private JsonNode readJson(final InputStream in) throws IOException {
+        JsonNode firstJsonNode;
+        try {
+            JsonParser jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);

Review Comment:
   Instead of calling `JsonFactory.createParser()`, it looks like `ObjectMapper.readTree(InputStream)` should return the root JSON Node.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   Is this dependency used?



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();

Review Comment:
   Instead of serializing the transformed `JsonNode` back to a `String`, the `ObjectMapper.writeValue()` method should be used to serialize the output directly to an `OutputStream` with `ProcessSession.write()`.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));

Review Comment:
   Is this call to Provenance Reporter necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] mattyb149 commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r990455957


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   No, it is a remnant from when I tried to add this and a JSLTTransformRecord processor, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org