You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/07 18:03:34 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #4797: NIFI-8111: Add JSLTTransformJSON processor

exceptionfactory commented on code in PR #4797:
URL: https://github.com/apache/nifi/pull/4797#discussion_r990364570


##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.schibsted.spt.data</groupId>
+            <artifactId>jslt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.8.1</version>

Review Comment:
   This version could be upgrade to that latest version 2, which is 2.9.3. Version 3 requires Java 11, so 2.9.3 seems to be the best option for now.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);

Review Comment:
   This should be changed to avoid repeating the exception message, and the `Object[]` wrapper should be removed.
   ```suggestion
               logger.error("JSLT Transform failed {}", original, ex);
   ```



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.schibsted.spt.data</groupId>
+            <artifactId>jslt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   This dependency can be removed.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        logger.debug("Transformed {}", original);
+    }
+
+    @OnStopped
+    @OnShutdown
+    public void onStopped(ProcessContext context) {
+        transformCache.cleanUp();
+    }
+
+    /**
+     * Takes an InputStream and returns the top-level JsonNode returned for the parsed JSON object
+     *
+     * @param in the InputStream from which to read the input
+     * @return the top-level JsonNode for the parsed JSON object
+     * @throws IOException if an error occurs while parsing JSON from the InputStream
+     */
+    private JsonNode readJson(final InputStream in) throws IOException {
+        JsonNode firstJsonNode;
+        try {
+            JsonParser jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            JsonToken token = jsonParser.nextToken();
+            if (token == JsonToken.START_ARRAY) {
+                token = jsonParser.nextToken(); // advance to START_OBJECT token
+            }

Review Comment:
   This approach only reads the first element in an array. It seems like this could be confusing behavior, should the Processor require all incoming JSON to be an Object at the root level? If JSLT does not support transforming array root elements, then it seems better to enforce a standard. It may be worth noting this detail in the description one way or the other.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        logger.debug("Transformed {}", original);
+    }
+
+    @OnStopped
+    @OnShutdown
+    public void onStopped(ProcessContext context) {
+        transformCache.cleanUp();
+    }
+
+    /**
+     * Takes an InputStream and returns the top-level JsonNode returned for the parsed JSON object
+     *
+     * @param in the InputStream from which to read the input
+     * @return the top-level JsonNode for the parsed JSON object
+     * @throws IOException if an error occurs while parsing JSON from the InputStream
+     */
+    private JsonNode readJson(final InputStream in) throws IOException {
+        JsonNode firstJsonNode;
+        try {
+            JsonParser jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);

Review Comment:
   Instead of calling `JsonFactory.createParser()`, it looks like `ObjectMapper.readTree(InputStream)` should return the root JSON Node.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/pom.xml:
##########
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-jslt-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-jslt-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   Is this dependency used?



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();

Review Comment:
   Instead of serializing the transformed `JsonNode` back to a `String`, the `ObjectMapper.writeValue()` method should be used to serialize the output directly to an `OutputStream` with `ProcessSession.write()`.



##########
nifi-nar-bundles/nifi-jslt-bundle/nifi-jslt-processors/src/main/java/org/apache/nifi/processors/jslt/JSLTTransformJSON.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jslt;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.schibsted.spt.data.jslt.Expression;
+import com.schibsted.spt.data.jslt.JsltException;
+import com.schibsted.spt.data.jslt.Parser;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "jslt", "transform"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
+@CapabilityDescription("Applies a JSLT transformation to the FlowFile JSON payload. A new FlowFile is created "
+        + "with transformed content and is routed to the 'success' relationship. If the JSLT transform "
+        + "fails, the original FlowFile is routed to the 'failure' relationship.")
+public class JSLTTransformJSON extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSLT_TRANSFORM = new PropertyDescriptor.Builder()
+            .name("jslt-transform-transformation")
+            .displayName("JSLT Transformation")
+            .description("JSLT Transformation for transform of JSON data. Any NiFi Expression Language present will be evaluated first to get the final transform to be applied.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("jslt-transform-pretty_print")
+            .displayName("Pretty Print")
+            .description("Apply pretty-print formatting to the output of the JSLT transform")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("jslt-transform-cache-size")
+            .displayName("Transform Cache Size")
+            .description("Compiling a JSLT Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+                    + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+    private static final JsonFactory jsonFactory = new JsonFactory();
+    private static final ObjectMapper codec = new ObjectMapper();
+
+    /**
+     * A cache for transform objects. It keeps values indexed by JSLT specification string.
+     */
+    private Cache<String, Expression> transformCache;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(JSLT_TRANSFORM);
+        descriptors.add(PRETTY_PRINT);
+        descriptors.add(TRANSFORM_CACHE_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        // If no EL present, pre-compile the script (and report any errors as to mark the processor invalid)
+        if (!validationContext.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = validationContext.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                Parser.compileString(transform);
+            } catch (JsltException je) {
+                results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(false).explanation("error in transform: " + je.getMessage()).build());
+            }
+        } else {
+            // Expression Language is present, we won't know if the transform is valid until the EL is evaluated
+            results.add(new ValidationResult.Builder().subject(JSLT_TRANSFORM.getDisplayName()).valid(true).build());
+        }
+        return results;
+
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build();
+        // Precompile the transform if it hasn't been done already (and if there is no Expression Language present)
+        if (!context.getProperty(JSLT_TRANSFORM).isExpressionLanguagePresent()) {
+            final String transform = context.getProperty(JSLT_TRANSFORM).getValue();
+            try {
+                transformCache.put(transform, Parser.compileString(transform));
+            } catch (JsltException je) {
+                throw new ProcessException("Error compiling JSLT transform: " + je.getMessage(), je);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        JsonNode firstJsonNode;
+        try (final InputStream in = session.read(original)) {
+            firstJsonNode = readJson(in);
+        } catch (final Exception e) {
+            logger.error("Failed to transform {}; routing to failure", original, e);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonString;
+        final String transform;
+        try {
+            transform = context.getProperty(JSLT_TRANSFORM).evaluateAttributeExpressions(original).getValue();
+            Expression jsltExpression = transformCache.get(transform, currString -> Parser.compileString(transform));
+
+            final JsonNode transformedJson = jsltExpression.apply(firstJsonNode);
+            if (transformedJson == null) {
+                jsonString = "";
+                logger.warn("JSLT transform resulted in no data");
+            } else {
+                jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? transformedJson.toPrettyString() : transformedJson.toString();
+            }
+        } catch (final Exception ex) {
+            logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString()}, ex);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+
+        transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.transfer(transformed, REL_SUCCESS);
+        session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transform, stopWatch.getElapsed(TimeUnit.MILLISECONDS));

Review Comment:
   Is this call to Provenance Reporter necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org