You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/02/19 19:12:38 UTC
[nifi] branch main updated: NIFI-9286: JOLT Expression Language Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification NIFI-9286: adding JOLT unit tests NIFI-9286: addressing PR feedback Fixes a problem with the scope of the EL for module directory NIFI-9286: alignment of JOLT processors NIFI-9286: fix checkstyle
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new dfbf2e3 NIFI-9286: JOLT Expression Language Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification NIFI-9286: adding JOLT unit tests NIFI-9286: addressing PR feedback Fixes a problem with the scope of the EL for module directory NIFI-9286: alignment of JOLT processors NIFI-9286: fix checkstyle
dfbf2e3 is described below
commit dfbf2e3cea5485196c18934d0c01f49103f12727
Author: levilentz <le...@gmail.com>
AuthorDate: Tue Oct 5 14:46:13 2021 -0700
NIFI-9286: JOLT Expression Language
Fixes NIFI-6213 and adds in functionality to use expression language in class and module specification
NIFI-9286: adding JOLT unit tests
NIFI-9286: addressing PR feedback
Fixes a problem with the scope of the EL for module directory
NIFI-9286: alignment of JOLT processors
NIFI-9286: fix checkstyle
This closes #5444
Signed-off-by: Mike Thomsen <mt...@apache.org>
---
.../jolt/record/JoltTransformRecord.java | 47 ++++++++++++-------
.../jolt/record/TestJoltTransformRecord.java | 28 +++++++++++
.../processors/standard/JoltTransformJSON.java | 54 +++++++++++++++-------
.../processors/standard/TestJoltTransformJSON.java | 27 +++++++++++
4 files changed, 123 insertions(+), 33 deletions(-)
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 0b48901..9293e4c 100644
--- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -20,14 +20,15 @@ import com.bazaarvoice.jolt.ContextualTransform;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils;
import com.bazaarvoice.jolt.Transform;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -60,7 +61,6 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -89,6 +89,7 @@ import java.util.stream.Collectors;
@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
+@RequiresInstanceClassLoading
public class JoltTransformRecord extends AbstractProcessor {
static final AllowableValue SHIFTR
@@ -151,8 +152,9 @@ public class JoltTransformRecord extends AbstractProcessor {
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@@ -163,6 +165,7 @@ public class JoltTransformRecord extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
.dynamicallyModifiesClasspath(true)
+ .dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@@ -200,7 +203,7 @@ public class JoltTransformRecord extends AbstractProcessor {
* For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
* when there is no jolt-record-spec specified).
*/
- private LoadingCache<Optional<String>, JoltTransform> transformCache;
+ private Cache<Optional<String>, JoltTransform> transformCache;
static {
final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -235,7 +238,6 @@ public class JoltTransformRecord extends AbstractProcessor {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
-
if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
if (!SORTR.getValue().equals(transform)) {
final String message = "A specification is required for this transformation";
@@ -247,7 +249,7 @@ public class JoltTransformRecord extends AbstractProcessor {
try {
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
- if (validationContext.isExpressionLanguagePresent(specValue)) {
+ if (validationContext.isExpressionLanguagePresent(specValue) ) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
@@ -265,6 +267,14 @@ public class JoltTransformRecord extends AbstractProcessor {
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
+ } else if (validationContext.isExpressionLanguagePresent(customTransform)) {
+ final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
+ if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .subject(CUSTOM_CLASS.getDisplayName())
+ .explanation("Invalid Expression Language: " + invalidExpressionMsg)
+ .build());
+ }
} else {
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson);
}
@@ -273,7 +283,7 @@ public class JoltTransformRecord extends AbstractProcessor {
}
}
} catch (final Exception e) {
- getLogger().info("Processor is not valid - " + e.toString());
+ getLogger().info("Processor is not valid - ", e);
String message = "Specification not valid for the selected transformation.";
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
@@ -443,7 +453,14 @@ public class JoltTransformRecord extends AbstractProcessor {
specString = Optional.empty();
}
- return transformCache.get(specString);
+ return transformCache.get(specString, currString -> {
+ try {
+ return createTransform(context, currString.orElse(null), flowFile);
+ } catch (Exception e) {
+ getLogger().error("Problem getting transform", e);
+ }
+ return null;
+ });
}
@OnScheduled
@@ -451,10 +468,10 @@ public class JoltTransformRecord extends AbstractProcessor {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
- .build(specString -> createTransform(context, specString.orElse(null)));
+ .build();
}
- private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
+ private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -463,16 +480,12 @@ public class JoltTransformRecord extends AbstractProcessor {
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
- return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+ return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
} else {
return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
}
- protected FilenameFilter getJarFilenameFilter() {
- return (dir, name) -> (name != null && name.endsWith(".jar"));
- }
-
protected static Object transform(JoltTransform joltTransform, Object input) {
return joltTransform instanceof ContextualTransform
? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input);
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index 6c4188b..288efe0 100644
--- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -583,6 +584,33 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
}
@Test
+ public void testExpressionLanguageJarFile() throws IOException {
+ generateTestData(1, null);
+ final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+ runner.setProperty(writer, "Pretty Print JSON", "true");
+ runner.enableControllerService(writer);
+ URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar");
+ assert t != null;
+ final String customJarPath = t.getPath();
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json")));
+ final String customJoltTransform = "TestCustomJoltTransform";
+ final String customClass = "TestCustomJoltTransform";
+ runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${JOLT_SPEC}");
+ runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+ runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "${CUSTOM_CLASS}");
+ runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+ runner.setVariable("CUSTOM_JAR", customJarPath);
+ Map<String, String> customSpecs = new HashMap<>();
+ customSpecs.put("JOLT_SPEC", spec);
+ customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+ customSpecs.put("CUSTOM_CLASS", customClass);
+ runner.enqueue(new byte[0], customSpecs);
+ runner.assertValid();
+ }
+
+ @Test
public void testJoltSpecEL() throws IOException {
generateTestData(1, null);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 7c7d649..040a9bd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -18,13 +18,14 @@ package org.apache.nifi.processors.standard;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
+@RequiresInstanceClassLoading
public class JoltTransformJSON extends AbstractProcessor {
public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to create the output JSON.");
@@ -109,8 +111,9 @@ public class JoltTransformJSON extends AbstractProcessor {
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(JOLT_SPEC, CUSTOMR)
.build();
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@@ -119,7 +122,9 @@ public class JoltTransformJSON extends AbstractProcessor {
.description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamicallyModifiesClasspath(true)
+ .dependsOn(JOLT_SPEC, CUSTOMR)
.build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
@@ -160,7 +165,7 @@ public class JoltTransformJSON extends AbstractProcessor {
* For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
* when there is no jolt-record-spec specified).
*/
- private LoadingCache<Optional<String>, JoltTransform> transformCache;
+ private Cache<Optional<String>, JoltTransform> transformCache;
static {
final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -188,8 +193,6 @@ public class JoltTransformJSON extends AbstractProcessor {
return properties;
}
-
-
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
@@ -208,7 +211,7 @@ public class JoltTransformJSON extends AbstractProcessor {
final ClassLoader customClassLoader;
try {
- if (modulePath != null) {
+ if (modulePath != null && !validationContext.isExpressionLanguagePresent(modulePath)) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
@@ -217,13 +220,21 @@ public class JoltTransformJSON extends AbstractProcessor {
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue)) {
- final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
+ final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(JOLT_SPEC.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.build());
}
+ } else if (validationContext.isExpressionLanguagePresent(customTransform)) {
+ final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(customTransform, true);
+ if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .subject(CUSTOM_CLASS.getDisplayName())
+ .explanation("Invalid Expression Language: " + invalidExpressionMsg)
+ .build());
+ }
} else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
@@ -242,7 +253,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
} catch (final Exception e) {
- getLogger().info("Processor is not valid - " + e.toString());
+ getLogger().error("processor is not valid: ", e);
String message = "Specification not valid for the selected transformation." ;
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
@@ -306,7 +317,7 @@ public class JoltTransformJSON extends AbstractProcessor {
logger.info("Transformed {}", new Object[]{original});
}
- private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
+ private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
@@ -314,7 +325,14 @@ public class JoltTransformJSON extends AbstractProcessor {
specString = Optional.empty();
}
- return transformCache.get(specString);
+ return transformCache.get(specString, currString -> {
+ try {
+ return createTransform(context, currString.orElse(null), flowFile);
+ } catch (Exception e) {
+ getLogger().error("Problem getting transform", e);
+ }
+ return null;
+ });
}
@OnScheduled
@@ -322,11 +340,15 @@ public class JoltTransformJSON extends AbstractProcessor {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
- .build(specString -> createTransform(context, specString.orElse(null)));
+ .build();
try {
if (context.getProperty(MODULES).isSet()) {
- customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
+ customClassLoader = ClassLoaderUtils.getCustomClassLoader(
+ context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+ this.getClass().getClassLoader(),
+ getJarFilenameFilter()
+ );
} else {
customClassLoader = this.getClass().getClassLoader();
}
@@ -335,7 +357,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
}
- private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
+ private JoltTransform createTransform(final ProcessContext context, final String specString, final FlowFile flowFile) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -344,7 +366,7 @@ public class JoltTransformJSON extends AbstractProcessor {
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
- return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+ return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(), specJson);
} else {
return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
index c548662..7e68c6b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -362,6 +363,32 @@ public class TestJoltTransformJSON {
}
@Test
+ public void testExpressionLanguageJarFile() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ final String customJoltTransform = "TestCustomJoltTransform";
+
+ Map<String, String> customSpecs = new HashMap<>();
+ customSpecs.put("JOLT_SPEC", spec);
+ customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, "${JOLT_SPEC}");
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"${CUSTOM_JOLT_CLASS}");
+ runner.setProperty(JoltTransformJSON.MODULES, "${CUSTOM_JAR}");
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.setVariable("CUSTOM_JAR", customJarPath);
+ runner.enqueue(JSON_INPUT, customSpecs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
+ @Test
public void testTransformInputWithCustomTransformationWithDir() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson";