You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/08/28 18:32:52 UTC

[2/3] nifi git commit: NIFI-5353: Add JoltTransformRecord processor

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
new file mode 100644
index 0000000..313781d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.jolt.record;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestJoltTransformRecord {
+
+    private TestRunner runner;
+    private JoltTransformRecord processor;
+    private MockRecordParser parser;
+    private JsonRecordSetWriter writer;
+
+    @Before
+    public void setup() throws Exception {
+        processor = new JoltTransformRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(JoltTransformRecord.RECORD_READER, "parser");
+        writer = new JsonRecordSetWriter();
+        try {
+            runner.addControllerService("writer", writer);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.setProperty(writer, "Schema Write Strategy", "full-schema-attribute");
+        runner.setProperty(JoltTransformRecord.RECORD_WRITER, "writer");
+        // Each test must set the Schema Access strategy and Schema, and enable the writer CS
+    }
+
+    @Test
+    public void testRelationshipsCreated() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.enqueue(new byte[0]);
+        Set<Relationship> relationships = processor.getRelationships();
+        assertTrue(relationships.contains(JoltTransformRecord.REL_FAILURE));
+        assertTrue(relationships.contains(JoltTransformRecord.REL_SUCCESS));
+        assertTrue(relationships.contains(JoltTransformRecord.REL_ORIGINAL));
+        assertEquals(3, relationships.size());
+    }
+
+    @Test
+    public void testInvalidJOLTSpec() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = "[{}]";
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testIncorrectJOLTSpec() throws IOException {
+        final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testSpecIsNotSet() {
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testSpecIsEmpty() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, StringUtils.EMPTY);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testSpecNotRequired() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR);
+        runner.assertValid();
+    }
+
+    @Test
+    public void testNoFlowFileContent() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testInvalidFlowFileContent() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        parser.failAfter(0);
+        runner.enqueue("invalid json");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(JoltTransformRecord.REL_FAILURE);
+    }
+
+    @Test
+    public void testCustomTransformationWithNoModule() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform");
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testCustomTransformationWithMissingClassName() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar";
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+        runner.enqueue(new byte[0]);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testCustomTransformationWithInvalidClassPath() throws IOException {
+        final String customJarPath = "src/test/resources/TestJoltTransformRecord/FakeCustomJar.jar";
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform");
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+        runner.enqueue(new byte[0]);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testCustomTransformationWithInvalidClassName() throws IOException {
+        final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar";
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "FakeCustomJoltTransform");
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR);
+        runner.enqueue(new byte[0]);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testTransformInputWithChainr() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithShiftr() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))),
+                new String(transformed.toByteArray()));
+
+    }
+
+    @Test
+    public void testTransformInputWithDefaultr() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithRemovr() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.REMOVR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json"))),
+                new String(transformed.toByteArray()));
+
+    }
+
+    @Test
+    public void testTransformInputWithCardinality() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CARDINALITY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json"))),
+                new String(transformed.toByteArray()));
+
+    }
+
+    @Test
+    public void testTransformInputWithSortr() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithDefaultrExpressionLanguage() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR);
+        runner.setVariable("quota", "5");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json"))),
+                new String(transformed.toByteArray()));
+
+    }
+
+    @Test
+    public void testTransformInputWithModifierDefault() throws IOException {
+        generateTestData(1, null);
+        // Input schema = output schema, just modifying values
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/inputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithModifierDefine() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithModifierOverwrite() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputWithSortrPopulatedSpec() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR);
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, "abcd");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testTransformInputCustomTransformationIgnored() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar";
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform");
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testJoltSpecEL() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = "${joltSpec}";
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR);
+        final Map<String, String> attributes = Collections.singletonMap("joltSpec",
+                "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}");
+        runner.enqueue(new byte[0], attributes);
+        runner.run();
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
+                new String(transformed.toByteArray()));
+    }
+
+    @Test
+    public void testJoltSpecInvalidEL() {
+        final TestRunner runner = TestRunners.newTestRunner(new JoltTransformRecord());
+        final String spec = "${joltSpec:nonExistingFunction()}";
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.enqueue(new byte[0]);
+        runner.assertNotValid();
+    }
+
+    private void generateTestData(int numRecords, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) {
+
+        if (recordGenerator == null) {
+            final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList(
+                    new RecordField("value", RecordFieldType.INT.getDataType())));
+            final RecordSchema seriesSchema = new SimpleRecordSchema(Arrays.asList(
+                    new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))));
+            final RecordSchema qualitySchema = new SimpleRecordSchema(Arrays.asList(
+                    new RecordField("value", RecordFieldType.INT.getDataType())));
+            final RecordSchema ratingSchema = new SimpleRecordSchema(Arrays.asList(
+                    new RecordField("primary", RecordFieldType.RECORD.getDataType()),
+                    new RecordField("series", RecordFieldType.RECORD.getDataType()),
+                    new RecordField("quality", RecordFieldType.RECORD.getDataType())
+            ));
+            parser.addSchemaField("rating", RecordFieldType.RECORD);
+
+            for (int i = 0; i < numRecords; i++) {
+                final int index = i;
+
+                Record primaryRecord = new MapRecord(primarySchema, new HashMap<String, Object>() {{
+                    put("value", (10 * index) + 3);
+                }});
+                Record seriesRecord = new MapRecord(seriesSchema, new HashMap<String, Object>() {{
+                    put("value", new Integer[]{(10 * index) + 5, (10 * index) + 4});
+                }});
+                Record qualityRecord = new MapRecord(qualitySchema, new HashMap<String, Object>() {{
+                    put("value", 3);
+                }});
+
+
+                Record ratingRecord = new MapRecord(ratingSchema, new HashMap<String, Object>() {{
+                    put("primary", primaryRecord);
+                    put("series", seriesRecord);
+                    put("quality", qualityRecord);
+                }});
+
+                parser.addRecord(ratingRecord);
+            }
+
+
+        } else {
+            recordGenerator.apply(numRecords, parser);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java
new file mode 100644
index 0000000..016bebf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.jolt.record.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.junit.Test;
+
+import com.bazaarvoice.jolt.CardinalityTransform;
+import com.bazaarvoice.jolt.Chainr;
+import com.bazaarvoice.jolt.Defaultr;
+import com.bazaarvoice.jolt.JoltTransform;
+import com.bazaarvoice.jolt.JsonUtils;
+import com.bazaarvoice.jolt.Modifier;
+import com.bazaarvoice.jolt.Removr;
+import com.bazaarvoice.jolt.Shiftr;
+import com.bazaarvoice.jolt.Sortr;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestTransformFactory {
+
+
+    @Test
+    public void testGetChainTransform() throws Exception {
+        final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain", JsonUtils.jsonToObject(chainrSpec));
+        assertTrue(transform instanceof Chainr);
+    }
+
+    @Test
+    public void testGetDefaultTransform() throws Exception {
+        final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/defaultrSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-default", JsonUtils.jsonToObject(defaultrSpec));
+        assertTrue(transform instanceof Defaultr);
+    }
+
+    @Test
+    public void testGetSortTransform() throws Exception {
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-sort", null);
+        assertTrue(transform instanceof Sortr);
+    }
+
+    @Test
+    public void testGetShiftTransform() throws Exception {
+        final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/shiftrSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-shift", JsonUtils.jsonToObject(shiftrSpec));
+        assertTrue(transform instanceof Shiftr);
+    }
+
+    @Test
+    public void testGetRemoveTransform() throws Exception {
+        final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/removrSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-remove", JsonUtils.jsonToObject(removrSpec));
+        assertTrue(transform instanceof Removr);
+    }
+
+    @Test
+    public void testGetCardinalityTransform() throws Exception {
+        final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/cardrSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-card", JsonUtils.jsonToObject(cardrSpec));
+        assertTrue(transform instanceof CardinalityTransform);
+    }
+
+    @Test
+    public void testGetModifierDefaultTransform() throws Exception {
+        final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierDefaultSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-default", JsonUtils.jsonToObject(cardrSpec));
+        assertTrue(transform instanceof Modifier.Defaultr);
+    }
+
+    @Test
+    public void testGetModifierDefineTransform() throws Exception {
+        final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierDefineSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-define", JsonUtils.jsonToObject(cardrSpec));
+        assertTrue(transform instanceof Modifier.Definr);
+    }
+
+    @Test
+    public void testGetModifierOverwriteTransform() throws Exception {
+        final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierOverwriteSpec.json")));
+        JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-overwrite", JsonUtils.jsonToObject(cardrSpec));
+        assertTrue(transform instanceof Modifier.Overwritr);
+    }
+
+    @Test
+    public void testGetInvalidTransformWithNoSpec() {
+        try {
+            TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain", null);
+        } catch (Exception e) {
+            assertEquals("JOLT Chainr expects a JSON array of objects - Malformed spec.", e.getLocalizedMessage());
+        }
+    }
+
+    @Test
+    public void testGetCustomTransformation() throws Exception {
+        final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
+        Path jarFilePath = Paths.get("src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar");
+        URL[] urlPaths = new URL[1];
+        urlPaths[0] = jarFilePath.toUri().toURL();
+        ClassLoader customClassLoader = new URLClassLoader(urlPaths, this.getClass().getClassLoader());
+        JoltTransform transform = TransformFactory.getCustomTransform(customClassLoader, "TestCustomJoltTransform", JsonUtils.jsonToObject(chainrSpec));
+        assertNotNull(transform);
+        assertEquals("TestCustomJoltTransform", transform.getClass().getName());
+    }
+
+    @Test
+    public void testGetCustomTransformationNotFound() throws Exception {
+        final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
+        try {
+            TransformFactory.getCustomTransform(this.getClass().getClassLoader(), "TestCustomJoltTransform", chainrSpec);
+        } catch (ClassNotFoundException cnf) {
+            assertEquals("TestCustomJoltTransform", cnf.getLocalizedMessage());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar
new file mode 100644
index 0000000..b738658
Binary files /dev/null and b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json
new file mode 100644
index 0000000..7910e1a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json
@@ -0,0 +1,13 @@
+[ {
+  "rating" : {
+    "primary" : {
+      "value" : 3
+    },
+    "quality" : {
+      "value" : 3
+    },
+    "series" : {
+      "value" : 5
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc
new file mode 100644
index 0000000..94793b0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "input",
+ "fields": [
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "value", "type": "int"}
+                   ]
+                 }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json
new file mode 100644
index 0000000..fc77f18
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json
@@ -0,0 +1,7 @@
+{
+   "rating" : {
+      "series" : {
+          "value" : "ONE"
+      }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json
new file mode 100644
index 0000000..2065f63
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json
@@ -0,0 +1,16 @@
+[ {
+  "Range" : 5,
+  "Rating" : 3,
+  "SecondaryRatings" : {
+    "quality" : {
+      "Id" : "quality",
+      "Range" : 5,
+      "Value" : 3
+    },
+    "series" : {
+      "Id" : "series",
+      "Range" : 5,
+      "Value" : [ 5, 4 ]
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc
new file mode 100644
index 0000000..eebf0ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "output",
+ "fields": [
+     {"name": "Range", "type": "int"},
+     {"name": "Rating", "type": "int"},
+     {"name": "SecondaryRatings", "type": {
+        "name": "SecondaryRatingsRecord", "type": "record", "fields":[
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "Id", "type": ["null", "string"]},
+                    {"name": "Range", "type": ["null", "int"]},
+                    {"name": "Value", "type": ["null", "int"]}
+                   ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "Id", "type": ["null", "string"]},
+                    {"name": "Range", "type": ["null", "int"]},
+                    {"name": "Value", "type": { "type": "array", "items": "int" }}
+                   ]
+                 }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json
new file mode 100644
index 0000000..7884abf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json
@@ -0,0 +1,29 @@
+[
+  {
+    "operation": "shift",
+    "spec": {
+      "rating": {
+        "primary": {
+          "value": "Rating",
+          "max": "RatingRange"
+        },
+        "*": {
+          "max": "SecondaryRatings.&1.Range",
+          "value": "SecondaryRatings.&1.Value",
+          "$": "SecondaryRatings.&1.Id"
+        }
+      }
+    }
+  },
+  {
+    "operation": "default",
+    "spec": {
+      "Range": 5,
+      "SecondaryRatings": {
+        "*": {
+          "Range": 5
+        }
+      }
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json
new file mode 100644
index 0000000..eef0933
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json
@@ -0,0 +1,35 @@
+[
+  {
+    "operation":"TestCustomJoltTransform",
+    "spec" :
+  [
+    {
+      "operation": "shift",
+      "spec": {
+        "rating": {
+          "primary": {
+            "value": "Rating",
+            "max": "RatingRange"
+          },
+          "*": {
+            "max": "SecondaryRatings.&1.Range",
+            "value": "SecondaryRatings.&1.Value",
+            "$": "SecondaryRatings.&1.Id"
+          }
+        }
+      }
+    },
+    {
+      "operation": "default",
+      "spec": {
+        "Range": 5,
+        "SecondaryRatings": {
+          "*": {
+            "Range": 5
+          }
+        }
+      }
+    }
+  ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json
new file mode 100644
index 0000000..92c595d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json
@@ -0,0 +1,26 @@
+[ {
+  "RatingRange" : 5,
+  "rating" : {
+    "primary" : {
+      "value" : 3,
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    },
+    "quality" : {
+      "value" : 3,
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    },
+    "series" : {
+      "value" : [ 5, 4 ],
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    },
+    "quota" : {
+      "value" : "5"
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc
new file mode 100644
index 0000000..6b1ab62
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "output",
+ "fields": [
+     {"name": "RatingRange", "type": "int"},
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                 ]
+                }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "value", "type": { "type": "array", "items": "int" }},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                   ]
+                 }
+            },
+            { "name": "quota", "type": {
+                "name": "quotaRecord", "type": "record", "fields":[
+                    {"name": "value", "type": "string"}
+                 ]
+                }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json
new file mode 100644
index 0000000..b06f10a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json
@@ -0,0 +1,26 @@
+{
+  "RatingRange" : 5,
+  "rating": {
+    "primary": {
+      "value": 3,
+      "MaxLabel": "High",
+      "MinLabel": "Low",
+      "DisplayType": "NORMAL"
+    },
+    "quality": {
+      "value": 3,
+      "MaxLabel": "High",
+      "MinLabel": "Low",
+      "DisplayType": "NORMAL"
+    },
+    "series": {
+      "value": [5,4],
+      "MaxLabel": "High",
+      "MinLabel": "Low",
+      "DisplayType": "NORMAL"
+    },
+    "quota":{
+      "value": "${quota}"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json
new file mode 100644
index 0000000..81b9784
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json
@@ -0,0 +1,23 @@
+[ {
+  "RatingRange" : 5,
+  "rating" : {
+    "primary" : {
+      "value" : 3,
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    },
+    "quality" : {
+      "value" : 3,
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    },
+    "series" : {
+      "value" : [ 5, 4 ],
+      "MaxLabel" : "High",
+      "MinLabel" : "Low",
+      "DisplayType" : "NORMAL"
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc
new file mode 100644
index 0000000..979280c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "output",
+ "fields": [
+     {"name": "RatingRange", "type": "int"},
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                 ]
+                }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "value", "type": { "type": "array", "items": "int" }},
+                    {"name": "MaxLabel", "type": "string"},
+                    {"name": "MinLabel", "type": "string"},
+                    {"name": "DisplayType", "type": "string"}
+                   ]
+                 }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json
new file mode 100644
index 0000000..2f2ea9f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json
@@ -0,0 +1,10 @@
+ {
+   "RatingRange" : 5,
+   "rating": {
+     "*": {
+        "MaxLabel": "High",
+        "MinLabel": "Low",
+        "DisplayType": "NORMAL"
+     }
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json
new file mode 100644
index 0000000..12d85db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json
@@ -0,0 +1,13 @@
+{
+  "rating": {
+    "primary": {
+      "value": 3
+    },
+    "series": {
+      "value": [5,4]
+    },
+    "quality": {
+      "value": 3
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc
new file mode 100644
index 0000000..247a491
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "input",
+ "fields": [
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "value", "type": { "type": "array", "items": "int" }}
+                   ]
+                 }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json
new file mode 100644
index 0000000..e41ebb4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json
@@ -0,0 +1,13 @@
+[ {
+  "rating" : {
+    "primary" : {
+      "value" : 0
+    },
+    "series" : {
+      "value" : [ 5, 4 ]
+    },
+    "quality" : {
+      "value" : 3
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json
new file mode 100644
index 0000000..a351a98
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json
@@ -0,0 +1,7 @@
+{
+  "rating": {
+    "primary?": {
+      "+value": 0
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json
new file mode 100644
index 0000000..f307120
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json
@@ -0,0 +1,16 @@
+[ {
+  "rating" : {
+    "primary" : {
+      "value" : 3
+    },
+    "series" : {
+      "value" : [ 5, 4 ]
+    },
+    "quality" : {
+      "value" : 3
+    },
+    "question" : {
+      "value" : 0
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc
new file mode 100644
index 0000000..5586c65
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "input",
+ "fields": [
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "value", "type": { "type": "array", "items": "int" }}
+                   ]
+                 }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "question", "type": {
+                "name": "questionRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json
new file mode 100644
index 0000000..e4d9ec8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json
@@ -0,0 +1,7 @@
+{
+  "rating": {
+    "question": {
+      "value": 0
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json
new file mode 100644
index 0000000..ef773ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json
@@ -0,0 +1,14 @@
+[ {
+  "rating" : {
+    "primary" : {
+      "value" : 3
+    },
+    "quality" : {
+      "value" : 3
+    },
+    "series" : {
+      "series_first" : 5,
+      "value" : [ 5, 4 ]
+    }
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc
new file mode 100644
index 0000000..7cd896c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "input",
+ "fields": [
+     {"name": "rating", "type": {
+        "name": "ratingRecord", "type": "record", "fields":[
+            { "name": "primary", "type": {
+                "name": "primaryRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "value", "type": ["null", "int"]}
+                 ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "series_first", "type": "int"},
+                    {"name": "value", "type": { "type": "array", "items": "int" }}
+                   ]
+                 }
+            }
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json
new file mode 100644
index 0000000..33b5e2a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json
@@ -0,0 +1,7 @@
+{
+  "rating": {
+    "series": {
+      "series_first": "=firstElement(@(1,value))"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json
new file mode 100644
index 0000000..f25a093
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json
@@ -0,0 +1,12 @@
+[ {
+  "Rating" : [ 3, 13 ],
+  "SecondaryRatings" : {
+    "quality" : {
+      "Id" : [ "quality", "quality" ]
+    },
+    "series" : {
+      "Id" : [ "series", "series" ]
+    },
+    "Range" : 5
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc
new file mode 100644
index 0000000..d88b05c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "output",
+ "fields": [
+     {"name": "Rating", "type": {"type": "array", "items": "int"}},
+     {"name": "SecondaryRatings", "type": {
+        "name": "SecondaryRatingsRecord", "type": "record", "fields":[
+            { "name": "quality", "type": {
+                "name": "qualityRecord", "type": "record", "fields":[
+                    {"name": "Id", "type": {"type": "array", "items": "string"}}
+                   ]
+                }
+            },
+            { "name": "series", "type": {
+                "name": "seriesRecord", "type": "record", "fields":[
+                    {"name": "Id", "type": {"type": "array", "items": "string"}}
+                   ]
+                 }
+            },
+            {"name": "Range", "type": "int"}
+        ]
+      }
+    }
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json
new file mode 100644
index 0000000..4496117
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json
@@ -0,0 +1,32 @@
+[
+  {
+    "operation": "shift",
+    "spec": {
+      "*": {
+        "rating": {
+          "primary": {
+            "value": "Rating",
+            "max": "RatingRange"
+          },
+          "*": {
+            "max": "SecondaryRatings.&1.Range",
+            "$": "SecondaryRatings.&1.Id"
+          }
+        }
+      }
+    }
+  },
+  {
+    "operation": "default",
+    "spec": {
+      "*": {
+        "Range": 5,
+        "SecondaryRatings": {
+          "*": {
+            "Range": 5
+          }
+        }
+      }
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json
new file mode 100644
index 0000000..391e0b3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json
@@ -0,0 +1,5 @@
+[ {
+  "primary_value" : 3
+}, {
+  "primary_value" : 13
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc
new file mode 100644
index 0000000..ee81ff1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "output",
+ "fields": [
+     {"name": "primary_value", "type": "int"}
+ ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/53969adc/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json
new file mode 100644
index 0000000..e60b42e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json
@@ -0,0 +1,13 @@
+[
+  {
+    "operation": "shift",
+    "spec": {
+      "*": {
+        "rating": {
+          "primary": {
+            "value": "[#4].primary_value"
+          }
+        }
+      }
+    }
+  }]
\ No newline at end of file