You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/02/19 19:12:38 UTC

[nifi] branch main updated: NIFI-9286: JOLT Expression Language Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification NIFI-9286: adding JOLT unit tests NIFI-9286: addressing PR feedback Fixes a problem with the scope of the EL for module directory NIFI-9286: alignment of JOLT processors NIFI-9286: fix checkstyle

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dfbf2e3  NIFI-9286: JOLT Expression Language Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification NIFI-9286: adding JOLT unit tests NIFI-9286: addressing PR feedback Fixes a problem with the scope of the EL for module directory NIFI-9286: alignment of JOLT processors NIFI-9286: fix checkstyle
dfbf2e3 is described below

commit dfbf2e3cea5485196c18934d0c01f49103f12727
Author: levilentz <le...@gmail.com>
AuthorDate: Tue Oct 5 14:46:13 2021 -0700

    NIFI-9286: JOLT Expression Language
    Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification
    NIFI-9286: adding JOLT unit tests
    NIFI-9286: addressing PR feedback
    Fixes a problem with the scope of the EL for module directory
    NIFI-9286: alignment of JOLT processors
    NIFI-9286: fix checkstyle
    
    This closes #5444
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../jolt/record/JoltTransformRecord.java           | 47 ++++++++++++-------
 .../jolt/record/TestJoltTransformRecord.java       | 28 +++++++++++
 .../processors/standard/JoltTransformJSON.java     | 54 +++++++++++++++-------
 .../processors/standard/TestJoltTransformJSON.java | 27 +++++++++++
 4 files changed, 123 insertions(+), 33 deletions(-)

diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 0b48901..9293e4c 100644
--- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -20,14 +20,15 @@ import com.bazaarvoice.jolt.ContextualTransform;
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
 import com.bazaarvoice.jolt.Transform;
+import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -60,7 +61,6 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -89,6 +89,7 @@ import java.util.stream.Collectors;
 @CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created "
         + "with transformed content and is routed to the 'success' relationship. If the transform "
         + "fails, the original FlowFile is routed to the 'failure' relationship.")
+@RequiresInstanceClassLoading
 public class JoltTransformRecord extends AbstractProcessor {
 
     static final AllowableValue SHIFTR
@@ -151,8 +152,9 @@ public class JoltTransformRecord extends AbstractProcessor {
             .displayName("Custom Transformation Class Name")
             .description("Fully Qualified Class Name for Custom Transformation")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@@ -163,6 +165,7 @@ public class JoltTransformRecord extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
             .dynamicallyModifiesClasspath(true)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@@ -200,7 +203,7 @@ public class JoltTransformRecord extends AbstractProcessor {
      * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
      * when there is no jolt-record-spec specified).
      */
-    private LoadingCache<Optional<String>, JoltTransform> transformCache;
+    private Cache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -235,7 +238,6 @@ public class JoltTransformRecord extends AbstractProcessor {
         final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
         final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
         final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
-
         if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
             if (!SORTR.getValue().equals(transform)) {
                 final String message = "A specification is required for this transformation";
@@ -247,7 +249,7 @@ public class JoltTransformRecord extends AbstractProcessor {
             try {
                 final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
 
-                if (validationContext.isExpressionLanguagePresent(specValue)) {
+                if (validationContext.isExpressionLanguagePresent(specValue) ) {
                     final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
                     if (!StringUtils.isEmpty(invalidExpressionMsg)) {
                         results.add(new ValidationResult.Builder().valid(false)
@@ -265,6 +267,14 @@ public class JoltTransformRecord extends AbstractProcessor {
                             results.add(new ValidationResult.Builder().valid(false)
                                     .explanation(customMessage)
                                     .build());
+                        } else if (validationContext.isExpressionLanguagePresent(customTransform)) {
+                            final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
+                            if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+                                results.add(new ValidationResult.Builder().valid(false)
+                                        .subject(CUSTOM_CLASS.getDisplayName())
+                                        .explanation("Invalid Expression Language: " + invalidExpressionMsg)
+                                        .build());
+                            }
                         } else {
                             TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson);
                         }
@@ -273,7 +283,7 @@ public class JoltTransformRecord extends AbstractProcessor {
                     }
                 }
             } catch (final Exception e) {
-                getLogger().info("Processor is not valid - " + e.toString());
+                getLogger().info("Processor is not valid - ", e);
                 String message = "Specification not valid for the selected transformation.";
                 results.add(new ValidationResult.Builder().valid(false)
                         .explanation(message)
@@ -443,7 +453,14 @@ public class JoltTransformRecord extends AbstractProcessor {
             specString = Optional.empty();
         }
 
-        return transformCache.get(specString);
+        return transformCache.get(specString, currString -> {
+            try {
+                return createTransform(context, currString.orElse(null), flowFile);
+            } catch (Exception e) {
+                getLogger().error("Problem getting transform", e);
+            }
+            return null;
+        });
     }
 
     @OnScheduled
@@ -451,10 +468,10 @@ public class JoltTransformRecord extends AbstractProcessor {
         int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
         transformCache = Caffeine.newBuilder()
                 .maximumSize(maxTransformsToCache)
-                .build(specString -> createTransform(context, specString.orElse(null)));
+                .build();
     }
 
-    private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
+    private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
         final Object specJson;
         if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
             specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -463,16 +480,12 @@ public class JoltTransformRecord extends AbstractProcessor {
         }
 
         if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
         } else {
             return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
         }
     }
 
-    protected FilenameFilter getJarFilenameFilter() {
-        return (dir, name) -> (name != null && name.endsWith(".jar"));
-    }
-
     protected static Object transform(JoltTransform joltTransform, Object input) {
         return joltTransform instanceof ContextualTransform
                 ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input);
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index 6c4188b..288efe0 100644
--- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -583,6 +584,33 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
     }
 
     @Test
+    public void testExpressionLanguageJarFile() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar");
+        assert t != null;
+        final String customJarPath = t.getPath();
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json")));
+        final String customJoltTransform = "TestCustomJoltTransform";
+        final String customClass = "TestCustomJoltTransform";
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${JOLT_SPEC}");
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "${CUSTOM_CLASS}");
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+        runner.setVariable("CUSTOM_JAR", customJarPath);
+        Map<String, String> customSpecs = new HashMap<>();
+        customSpecs.put("JOLT_SPEC", spec);
+        customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+        customSpecs.put("CUSTOM_CLASS", customClass);
+        runner.enqueue(new byte[0], customSpecs);
+        runner.assertValid();
+    }
+
+    @Test
     public void testJoltSpecEL() throws IOException {
         generateTestData(1, null);
         final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 7c7d649..040a9bd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -18,13 +18,14 @@ package org.apache.nifi.processors.standard;
 
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
+import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 @CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
         + "with transformed content and is routed to the 'success' relationship. If the JSON transform "
         + "fails, the original FlowFile is routed to the 'failure' relationship.")
+@RequiresInstanceClassLoading
 public class JoltTransformJSON extends AbstractProcessor {
 
     public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to create the output JSON.");
@@ -109,8 +111,9 @@ public class JoltTransformJSON extends AbstractProcessor {
             .displayName("Custom Transformation Class Name")
             .description("Fully Qualified Class Name for Custom Transformation")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@@ -119,7 +122,9 @@ public class JoltTransformJSON extends AbstractProcessor {
             .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
             .required(false)
             .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dynamicallyModifiesClasspath(true)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@@ -160,7 +165,7 @@ public class JoltTransformJSON extends AbstractProcessor {
      * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
      * when there is no jolt-record-spec specified).
      */
-    private LoadingCache<Optional<String>, JoltTransform> transformCache;
+    private Cache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -188,8 +193,6 @@ public class JoltTransformJSON extends AbstractProcessor {
         return properties;
     }
 
-
-
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
@@ -208,7 +211,7 @@ public class JoltTransformJSON extends AbstractProcessor {
             final ClassLoader customClassLoader;
 
             try {
-                if (modulePath != null) {
+                if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) {
                     customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
                 } else {
                     customClassLoader =  this.getClass().getClassLoader();
@@ -217,13 +220,21 @@ public class JoltTransformJSON extends AbstractProcessor {
                 final String specValue =  validationContext.getProperty(JOLT_SPEC).getValue();
 
                 if (validationContext.isExpressionLanguagePresent(specValue)) {
-                    final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
+                    final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
                     if (!StringUtils.isEmpty(invalidExpressionMsg)) {
                         results.add(new ValidationResult.Builder().valid(false)
                                 .subject(JOLT_SPEC.getDisplayName())
                                 .explanation("Invalid Expression Language: " + invalidExpressionMsg)
                                 .build());
                     }
+                } else if (validationContext.isExpressionLanguagePresent(customTransform)) {
+                    final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
+                    if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+                        results.add(new ValidationResult.Builder().valid(false)
+                                .subject(CUSTOM_CLASS.getDisplayName())
+                                .explanation("Invalid Expression Language: " + invalidExpressionMsg)
+                                .build());
+                    }
                 } else {
                     //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
                     Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
@@ -242,7 +253,7 @@ public class JoltTransformJSON extends AbstractProcessor {
                     }
                 }
             } catch (final Exception e) {
-                getLogger().info("Processor is not valid - " + e.toString());
+                getLogger().error("processor is not valid: ", e);
                 String message = "Specification not valid for the selected transformation." ;
                 results.add(new ValidationResult.Builder().valid(false)
                         .explanation(message)
@@ -306,7 +317,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         logger.info("Transformed {}", new Object[]{original});
     }
 
-    private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
+    private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
         final Optional<String> specString;
         if (context.getProperty(JOLT_SPEC).isSet()) {
             specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
@@ -314,7 +325,14 @@ public class JoltTransformJSON extends AbstractProcessor {
             specString = Optional.empty();
         }
 
-        return transformCache.get(specString);
+        return transformCache.get(specString, currString -> {
+            try {
+                return createTransform(context, currString.orElse(null), flowFile);
+            } catch (Exception e) {
+                getLogger().error("Problem getting transform", e);
+            }
+            return null;
+        });
     }
 
     @OnScheduled
@@ -322,11 +340,15 @@ public class JoltTransformJSON extends AbstractProcessor {
         int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
         transformCache = Caffeine.newBuilder()
                 .maximumSize(maxTransformsToCache)
-                .build(specString -> createTransform(context, specString.orElse(null)));
+                .build();
 
         try {
             if (context.getProperty(MODULES).isSet()) {
-                customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
+                customClassLoader = ClassLoaderUtils.getCustomClassLoader(
+                        context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+                        this.getClass().getClassLoader(),
+                        getJarFilenameFilter()
+                );
             } else {
                 customClassLoader = this.getClass().getClassLoader();
             }
@@ -335,7 +357,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         }
     }
 
-    private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
+    private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
         final Object specJson;
         if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
             specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -344,7 +366,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         }
 
         if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
         } else {
             return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
index c548662..7e68c6b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -362,6 +363,32 @@ public class TestJoltTransformJSON {
     }
 
     @Test
+    public void testExpressionLanguageJarFile() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+        final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+        final String customJoltTransform = "TestCustomJoltTransform";
+
+        Map<String, String> customSpecs = new HashMap<>();
+        customSpecs.put("JOLT_SPEC", spec);
+        customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+        runner.setProperty(JoltTransformJSON.JOLT_SPEC, "${JOLT_SPEC}");
+        runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"${CUSTOM_JOLT_CLASS}");
+        runner.setProperty(JoltTransformJSON.MODULES, "${CUSTOM_JAR}");
+        runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+        runner.setVariable("CUSTOM_JAR", customJarPath);
+        runner.enqueue(JSON_INPUT, customSpecs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+        Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+        Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+        assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+    }
+
+    @Test
     public void testTransformInputWithCustomTransformationWithDir() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
         final String customJarPath = "src/test/resources/TestJoltTransformJson";