You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/14 22:04:52 UTC
nifi git commit: NIFI-2020 - initial commit for custom transformation
support
Repository: nifi
Updated Branches:
refs/heads/master 1da18a3f4 -> 048ba5366
NIFI-2020 - initial commit for custom transformation support
NIFI-2020 - updates to use lambdas/stream wherever possible and fix potential nullpointer issue.
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #564
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/048ba536
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/048ba536
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/048ba536
Branch: refs/heads/master
Commit: 048ba5366c7c4155e990c42f0e602b34076fc205
Parents: 1da18a3
Author: Yolanda M. Davis <yo...@gmail.com>
Authored: Wed Jun 22 22:26:16 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu Jul 14 18:04:14 2016 -0400
----------------------------------------------------------------------
.../util/file/classloader/ClassLoaderUtils.java | 66 ++++++++++
.../file/classloader/TestClassLoaderUtils.java | 73 +++++++++++
.../TestClassLoaderUtils/TestSuccess.jar | Bin 0 -> 827 bytes
.../transformjson/TransformJSONResource.java | 43 ++++++-
.../transformjson/dto/JoltSpecificationDTO.java | 18 +++
.../transformjson/transformjson.controller.js | 20 ++-
.../TestTransformJSONResource.java | 76 +++++++++++
.../TestCustomJoltTransform.jar | Bin 0 -> 1025 bytes
.../src/main/resources/META-INF/NOTICE | 5 +
.../nifi-standard-processors/pom.xml | 1 +
.../processors/standard/JoltTransformJSON.java | 101 +++++++++++++--
.../additionalDetails.html | 3 +
.../src/test/java/TestCustomJoltTransform.java | 41 ++++++
.../standard/TestJoltTransformJSON.java | 128 +++++++++++++++++++
.../TestCustomJoltTransform.jar | Bin 0 -> 1025 bytes
.../TestJoltTransformJson/customChainrSpec.json | 35 +++++
.../nifi-standard-utils/pom.xml | 5 +
.../standard/util/TransformFactory.java | 83 +++++++++++-
.../standard/util/TestTransformFactory.java | 56 +++++---
.../TestCustomJoltTransform.jar | Bin 0 -> 1025 bytes
20 files changed, 716 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
new file mode 100644
index 0000000..4d084ec
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.classloader;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ClassLoaderUtils {
+
+ public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
+ String[] modules = modulePath != null? modulePath.split(",") : null;
+ URL[] classpaths = getURLsForClasspath(modules,filenameFilter);
+ return createModuleClassLoader(classpaths,parentClassLoader);
+ }
+
+ protected static URL[] getURLsForClasspath(String[] modulePaths, FilenameFilter filenameFilter) throws MalformedURLException {
+ List<URL> additionalClasspath = new LinkedList<>();
+ if (modulePaths != null) {
+ for (String modulePathString : modulePaths) {
+ File modulePath = new File(modulePathString);
+
+ if (modulePath.exists()) {
+
+ additionalClasspath.add(modulePath.toURI().toURL());
+
+ if (modulePath.isDirectory()) {
+ File[] files = modulePath.listFiles(filenameFilter);
+
+ if (files != null) {
+ for (File jarFile : files) {
+ additionalClasspath.add(jarFile.toURI().toURL());
+ }
+ }
+ }
+ } else {
+ throw new MalformedURLException("Path specified does not exist");
+ }
+ }
+ }
+ return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
+ }
+
+ protected static ClassLoader createModuleClassLoader(URL[] modules,ClassLoader parentClassLoader) {
+ return new URLClassLoader(modules, parentClassLoader);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
new file mode 100644
index 0000000..cf47770
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.classloader;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.MalformedURLException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestClassLoaderUtils {
+
+ @Test
+ public void testGetCustomClassLoader() throws MalformedURLException,ClassNotFoundException{
+ final String jarFilePath = "src/test/resources/TestClassLoaderUtils";
+ ClassLoader customClassLoader = ClassLoaderUtils.getCustomClassLoader(jarFilePath ,this.getClass().getClassLoader(), getJarFilenameFilter());
+ assertTrue(customClassLoader != null);
+ assertTrue(customClassLoader.loadClass("TestSuccess") != null);
+ }
+
+ @Test
+ public void testGetCustomClassLoaderNoPathSpecified() throws MalformedURLException{
+ final ClassLoader originalClassLoader = this.getClass().getClassLoader();
+ ClassLoader customClassLoader = ClassLoaderUtils.getCustomClassLoader(null,originalClassLoader, getJarFilenameFilter());
+ assertTrue(customClassLoader != null);
+ try{
+ customClassLoader.loadClass("TestSuccess");
+ }catch (ClassNotFoundException cex){
+ assertTrue(cex.getLocalizedMessage().equals("TestSuccess"));
+ return;
+ }
+ fail("exception did not occur, class should not be found");
+ }
+
+ @Test
+ public void testGetCustomClassLoaderWithInvalidPath() {
+ final String jarFilePath = "src/test/resources/FakeTestClassLoaderUtils/TestSuccess.jar";
+ try {
+ ClassLoaderUtils.getCustomClassLoader(jarFilePath, this.getClass().getClassLoader(), getJarFilenameFilter());
+ }catch(MalformedURLException mex){
+ assertTrue(mex.getLocalizedMessage().equals("Path specified does not exist"));
+ return;
+ }
+ fail("exception did not occur, path should not exist");
+ }
+
+ protected FilenameFilter getJarFilenameFilter(){
+ return new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return (name != null && name.endsWith(".jar"));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-commons/nifi-utils/src/test/resources/TestClassLoaderUtils/TestSuccess.jar
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/resources/TestClassLoaderUtils/TestSuccess.jar b/nifi-commons/nifi-utils/src/test/resources/TestClassLoaderUtils/TestSuccess.jar
new file mode 100755
index 0000000..b28612a
Binary files /dev/null and b/nifi-commons/nifi-utils/src/test/resources/TestClassLoaderUtils/TestSuccess.jar differ
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/TransformJSONResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/TransformJSONResource.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/TransformJSONResource.java
index 861c120..3e79ccc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/TransformJSONResource.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/TransformJSONResource.java
@@ -17,6 +17,9 @@
package org.apache.nifi.web.standard.api.transformjson;
+import java.io.File;
+import java.io.FilenameFilter;
+
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -24,6 +27,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.processors.standard.util.TransformFactory;
import org.apache.nifi.web.standard.api.AbstractStandardResource;
import org.apache.nifi.web.standard.api.transformjson.dto.JoltSpecificationDTO;
@@ -53,10 +57,8 @@ public class TransformJSONResource extends AbstractStandardResource {
@Path("/validate")
public Response validateSpec(JoltSpecificationDTO specificationDTO) {
- Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
-
try {
- TransformFactory.getTransform(specificationDTO.getTransform(), specJson);
+ getTransformation(specificationDTO);
}catch(final Exception e){
logger.error("Validation Failed - " + e.toString());
return Response.ok(new ValidationDTO(false,"Validation Failed - Please verify the provided specification.")).build();
@@ -70,10 +72,8 @@ public class TransformJSONResource extends AbstractStandardResource {
@Path("/execute")
public Response executeSpec(JoltSpecificationDTO specificationDTO) {
- Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
-
try {
- Transform transform = TransformFactory.getTransform(specificationDTO.getTransform(), specJson);
+ Transform transform = getTransformation(specificationDTO);
Object inputJson = JsonUtils.jsonToObject(specificationDTO.getInput());
return Response.ok(JsonUtils.toJsonString(transform.transform(inputJson))).build();
@@ -84,6 +84,37 @@ public class TransformJSONResource extends AbstractStandardResource {
}
+ protected Transform getTransformation(JoltSpecificationDTO specificationDTO) throws Exception{
+
+ Object specJson = getSpecificationJsonObject(specificationDTO.getSpecification());
+ String transformName = specificationDTO.getTransform();
+ String modules = specificationDTO.getModules();
+
+ ClassLoader classLoader = null;
+ Transform transform ;
+ if(modules != null && !modules.isEmpty()){
+ classLoader = ClassLoaderUtils.getCustomClassLoader(specificationDTO.getModules(),this.getClass().getClassLoader(), getJarFilenameFilter());
+ } else{
+ classLoader = this.getClass().getClassLoader();
+ }
+
+ if(transformName.equals("jolt-transform-custom")) {
+ transform = TransformFactory.getCustomTransform(classLoader,specificationDTO.getCustomClass(), specJson);
+ }else{
+ transform = TransformFactory.getTransform(classLoader,specificationDTO.getTransform(), specJson);
+ }
+
+ return transform;
+ }
+
+ protected FilenameFilter getJarFilenameFilter(){
+ return new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return (name != null && name.endsWith(".jar"));
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/dto/JoltSpecificationDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/dto/JoltSpecificationDTO.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/dto/JoltSpecificationDTO.java
index d189198..268d665 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/dto/JoltSpecificationDTO.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/java/org/apache/nifi/web/standard/api/transformjson/dto/JoltSpecificationDTO.java
@@ -28,6 +28,8 @@ public class JoltSpecificationDTO implements Serializable{
private String transform;
private String specification;
private String input;
+ private String modules;
+ private String customClass;
public JoltSpecificationDTO() {
}
@@ -60,4 +62,20 @@ public class JoltSpecificationDTO implements Serializable{
public void setInput(String input) {
this.input = input;
}
+
+ public String getModules() {
+ return modules;
+ }
+
+ public void setModules(String modules) {
+ this.modules = modules;
+ }
+
+ public String getCustomClass() {
+ return customClass;
+ }
+
+ public void setCustomClass(String customClass) {
+ this.customClass = customClass;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/webapp/app/transformjson/transformjson.controller.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/webapp/app/transformjson/transformjson.controller.js b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/webapp/app/transformjson/transformjson.controller.js
index 4c8b637..ec40cc6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/webapp/app/transformjson/transformjson.controller.js
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/main/webapp/app/transformjson/transformjson.controller.js
@@ -53,6 +53,20 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
details['descriptors']['jolt-transform']['defaultValue'] ;
};
+ $scope.getCustomClass = function(details){
+ if(details['properties']['jolt-custom-class'] != null && details['properties']['jolt-custom-class'] != "") {
+ return details['properties']['jolt-custom-class'];
+ }
+ else return '';
+ };
+
+ $scope.getCustomModules = function(details){
+ if(details['properties']['jolt-custom-modules'] != null && details['properties']['jolt-custom-modules'] != "") {
+ return details['properties']['jolt-custom-modules'];
+ }
+ else return '';
+ };
+
$scope.getTransformOptions = function(details){
return $scope.convertToArray(details['descriptors']['jolt-transform']['allowableValues']);
};
@@ -61,6 +75,8 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
$scope.jsonSpec = $scope.getJsonSpec(details);
$scope.transform = $scope.getTransform(details);
$scope.transformOptions = $scope.getTransformOptions(details);
+ $scope.customClass = $scope.getCustomClass(details);
+ $scope.modules = $scope.getCustomModules(details);
};
$scope.populateScopeWithDetails(details.data);
@@ -243,7 +259,9 @@ var TransformJsonController = function ($scope, $state, $q, TransformJsonService
return {
"transform": transform,
"specification" : jsonSpec,
- "input" : jsonInput
+ "input" : jsonInput,
+ "customClass" : $scope.customClass,
+ "modules": $scope.modules
};
};
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/java/org/apache/nifi/web/standard/api/transformjson/TestTransformJSONResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/java/org/apache/nifi/web/standard/api/transformjson/TestTransformJSONResource.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/java/org/apache/nifi/web/standard/api/transformjson/TestTransformJSONResource.java
index ba00a15..739b9f9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/java/org/apache/nifi/web/standard/api/transformjson/TestTransformJSONResource.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/java/org/apache/nifi/web/standard/api/transformjson/TestTransformJSONResource.java
@@ -136,6 +136,82 @@ public class TestTransformJSONResource extends JerseyTest {
assertTrue(validation.isValid());
}
+ @Test
+ public void testValidateWithCustomSpec() {
+
+ final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
+ final Map<String,String> properties = new HashMap<>();
+ properties.put("jolt-transform","jolt-transform-custom");
+ final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
+ Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
+ Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
+
+ JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
+ joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
+ joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
+ ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
+ assertNotNull(validate);
+ assertTrue(validate.isValid());
+ }
+
+ @Test
+ public void testValidateWithCustomSpecEmptyModule() {
+
+ final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
+ final Map<String,String> properties = new HashMap<>();
+ properties.put("jolt-transform","jolt-transform-custom");
+ final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
+ Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
+ Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
+ JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
+ joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
+ ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
+ assertNotNull(validate);
+ assertTrue(!validate.isValid());
+ }
+
+ @Test
+ public void testValidateWithCustomInvalidSpec() {
+
+ final NiFiWebConfigurationContext niFiWebConfigurationContext = mock(NiFiWebConfigurationContext.class);
+ final Map<String,String> properties = new HashMap<>();
+ properties.put("jolt-transform","jolt-transform-custom");
+ final ComponentDetails componentDetails = new ComponentDetails.Builder().properties(properties).build();
+ Mockito.when(servletContext.getAttribute(Mockito.anyString())).thenReturn(niFiWebConfigurationContext);
+ Mockito.when(niFiWebConfigurationContext.getComponentDetails(any(NiFiWebRequestContext.class))).thenReturn(componentDetails);
+
+ JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}");
+ joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
+ joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
+ ValidationDTO validate = client().resource(getBaseURI()).path("/standard/transformjson/validate").post(ValidationDTO.class, joltSpecificationDTO);
+ assertNotNull(validate);
+ assertTrue(!validate.isValid());
+ }
+
+ @Test
+ public void testExecuteWithValidCustomSpec() {
+ final Diffy diffy = new Diffy();
+ JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
+ String inputJson = "{\"rating\":{\"quality\":2,\"count\":1}}";
+ joltSpecificationDTO.setInput(inputJson);
+ joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
+ joltSpecificationDTO.setModules("src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar");
+ String responseString = client().resource(getBaseURI()).path("/standard/transformjson/execute").post(String.class, joltSpecificationDTO);
+ Object transformedJson = JsonUtils.jsonToObject(responseString);
+ Object compareJson = JsonUtils.jsonToObject("{\"rating\":{\"quality\":2,\"count\":1}, \"custom-id\": 4}");
+ assertNotNull(transformedJson);
+ assertTrue(diffy.diff(compareJson, transformedJson).isEmpty());
+ }
+
+ @Test
+ public void testExecuteWithValidCustomSpecEmptyModule() {
+ JoltSpecificationDTO joltSpecificationDTO = new JoltSpecificationDTO("jolt-transform-custom","[{ \"operation\": \"default\", \"spec\":{ \"custom-id\" :4 }}]");
+ String inputJson = "{\"rating\":{\"quality\":2,\"count\":1}}";
+ joltSpecificationDTO.setInput(inputJson);
+ joltSpecificationDTO.setCustomClass("TestCustomJoltTransform");
+ exception.expect(UniformInterfaceException.class);
+ client().resource(getBaseURI()).path("/standard/transformjson/execute").post(String.class, joltSpecificationDTO);
+ }
@Test
public void testExecuteWithInvalidSpec() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar
new file mode 100644
index 0000000..b738658
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-jolt-transform-json-ui/src/test/resources/TestTransformJSONResource/TestCustomJoltTransform.jar differ
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index e723250..6554f3f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -1,6 +1,11 @@
nifi-standard-nar
Copyright 2014-2016 The Apache Software Foundation
+This includes derived works from the Apache Software License V2 library Jolt (https://github.com/bazaarvoice/jolt)
+Copyright 2013-2014 Bazaarvoice, Inc
+The derived work is adapted from com.bazaarvoice.jolt.chainr.ChainrBuilder.java, com.bazaarvoice.jolt.chainr.spec.ChainrSpec.java, com.bazaarvoice.jolt.chainr.spec.ChainrEntry.java and can be found in the org.apache.nifi.processors.standard.util.TransformFactory.java class.
+
+
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 69ddbb9..9aa5dca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -316,6 +316,7 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestSplitText/6.txt</exclude>
<exclude>src/test/resources/TestJoltTransformJson/input.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/chainrSpec.json</exclude>
+ <exclude>src/test/resources/TestJoltTransformJson/customChainrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/chainrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/cardrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformJson/cardrOutput.json</exclude>
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 3406b1f..cf1d8eb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard;
-
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -51,6 +51,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.processors.standard.util.TransformFactory;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.StreamUtils;
@@ -77,13 +78,14 @@ public class JoltTransformJSON extends AbstractProcessor {
public static final AllowableValue REMOVR = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output JSON.");
public static final AllowableValue CARDINALITY = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output JSON.");
public static final AllowableValue SORTR = new AllowableValue("jolt-transform-sort", "Sort", "Sort input json key values alphabetically. Any specification set is ignored.");
+ public static final AllowableValue CUSTOMR = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name");
public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
.name("jolt-transform")
.displayName("Jolt Transformation DSL")
.description("Specifies the Jolt Transformation that should be used with the provided specification.")
.required(true)
- .allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR)
+ .allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR,CUSTOMR)
.defaultValue(CHAINR.getValue())
.build();
@@ -95,6 +97,24 @@ public class JoltTransformJSON extends AbstractProcessor {
.required(false)
.build();
+ public static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder()
+ .name("jolt-custom-class")
+ .displayName("Custom Transformation Class Name")
+ .description("Fully Qualified Class Name for Custom Transformation")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
+ .name("jolt-custom-modules")
+ .displayName("Custom Module Directory")
+ .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
@@ -107,12 +127,15 @@ public class JoltTransformJSON extends AbstractProcessor {
private final static List<PropertyDescriptor> properties;
private final static Set<Relationship> relationships;
private volatile Transform transform;
+ private volatile ClassLoader customClassLoader;
private final static String DEFAULT_CHARSET = "UTF-8";
static{
final List<PropertyDescriptor> _properties = new ArrayList<>();
_properties.add(JOLT_TRANSFORM);
+ _properties.add(CUSTOM_CLASS);
+ _properties.add(MODULES);
_properties.add(JOLT_SPEC);
properties = Collections.unmodifiableList(_properties);
@@ -137,8 +160,10 @@ public class JoltTransformJSON extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
- String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
- String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null;
+ final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
+ final String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null;
+ final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
+ final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null;
if(StringUtils.isEmpty(specValue)){
if(!SORTR.getValue().equals(transform)) {
@@ -147,10 +172,35 @@ public class JoltTransformJSON extends AbstractProcessor {
.explanation(message)
.build());
}
+
} else {
+
+ final ClassLoader customClassLoader;
try {
+
+ if(modulePath != null) {
+ customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
+ }else{
+ customClassLoader = this.getClass().getClassLoader();
+ }
+
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue, DEFAULT_CHARSET);
- TransformFactory.getTransform(transform, specJson);
+
+ if(CUSTOMR.getValue().equals(transform)){
+
+ if (StringUtils.isEmpty(customTransform)){
+ final String customMessage = "A custom transformation class should be provided. ";
+ results.add(new ValidationResult.Builder().valid(false)
+ .explanation(customMessage)
+ .build());
+ }else{
+ TransformFactory.getCustomTransform(customClassLoader,customTransform, specJson);
+ }
+
+ }else {
+ TransformFactory.getTransform(customClassLoader, transform, specJson);
+ }
+
} catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString());
String message = "Specification not valid for the selected transformation." ;
@@ -182,8 +232,14 @@ public class JoltTransformJSON extends AbstractProcessor {
});
final String jsonString;
+ final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
+
+ if(customClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(customClassLoader);
+ }
+
final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent);
final Object inputJson = JsonUtils.jsonToObject(bais);
final Object transformedJson = transform.transform(inputJson);
@@ -193,6 +249,11 @@ public class JoltTransformJSON extends AbstractProcessor {
logger.error("Unable to transform {} due to {}", new Object[]{original, re});
session.transfer(original, REL_FAILURE);
return;
+
+ }finally {
+ if(customClassLoader != null && originalContextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(originalContextClassLoader);
+ }
}
FlowFile transformed = session.write(original, new OutputStreamCallback() {
@@ -212,12 +273,34 @@ public class JoltTransformJSON extends AbstractProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
- Object specJson = null;
- if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
- specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET);
+
+ try{
+ Object specJson = null;
+
+ if(context.getProperty(MODULES).isSet()){
+ customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(),this.getClass().getClassLoader(),getJarFilenameFilter());
+ }else{
+ customClassLoader = this.getClass().getClassLoader();
+ }
+
+ if(context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
+ specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET);
+ }
+
+ if(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())){
+ transform = TransformFactory.getCustomTransform(customClassLoader,context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+ }else {
+ transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
+ }
+
+ } catch (Exception ex){
+ getLogger().error("Unable to setup processor",ex);
}
- transform = TransformFactory.getTransform(context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
+
}
+ protected FilenameFilter getJarFilenameFilter(){
+ return (dir, name) -> (name != null && name.endsWith(".jar"));
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoltTransformJSON/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoltTransformJSON/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoltTransformJSON/additionalDetails.html
index f2f1c05..ca0c0ce 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoltTransformJSON/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoltTransformJSON/additionalDetails.html
@@ -28,6 +28,9 @@
The Jolt utilities processing JSON are not not stream based therefore large JSON document
transformation may consume large amounts of memory. Currently UTF-8 FlowFile content and Jolt specifications are supported.
+ Custom Jolt Transformations (that implement the Transform interface) are supported. Modules containing custom libraries which do not
+ existing on the current class path can be included via the custom module directory property.
+
<Strong>Note:</Strong> When configuring a processor if user selects of the Default transformation yet provides a
Chain specification the system does not alert that the specification is invalid and and will produce failed flow files.
This is a known issue identified within the Jolt library.
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestCustomJoltTransform.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestCustomJoltTransform.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestCustomJoltTransform.java
new file mode 100644
index 0000000..d70046d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestCustomJoltTransform.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.bazaarvoice.jolt.Chainr;
+
+import com.bazaarvoice.jolt.SpecDriven;
+import com.bazaarvoice.jolt.Transform;
+
+
+public class TestCustomJoltTransform implements SpecDriven,Transform {
+
+ final private Transform customTransform;
+
+ public TestCustomJoltTransform(Object specJson) {
+ this.customTransform = Chainr.fromSpec(specJson);
+ }
+
+ @Override
+ public Object transform(Object o) {
+ return customTransform.transform(o);
+ }
+
+ public static void main(String[] args) {
+ System.out.println("This is a Test Custom Transform");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
index 79cc95c..99f46e4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
@@ -116,6 +116,56 @@ public class TestJoltTransformJSON {
}
@Test
+ public void testCustomTransformationWithNoModule() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/customChainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS, "TestCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ }
+
+ @Test
+ public void testCustomTransformationWithMissingClassName() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testCustomTransformationWithInvalidClassPath() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/FakeCustomJar.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testCustomTransformationWithInvalidClassName() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"FakeCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.assertNotValid();
+ }
+
+ @Test
public void testTransformInputWithChainr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
@@ -228,5 +278,83 @@ public class TestJoltTransformJSON {
assertTrue(compareJsonString.equals(transformedJsonString));
}
+ @Test
+ public void testTransformInputWithCustomTransformationWithJar() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
+ @Test
+ public void testTransformInputWithCustomTransformationWithDir() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
+ @Test
+ public void testTransformInputWithChainrEmbeddedCustomTransformation() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/customChainrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC,spec);
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
+ @Test
+ public void testTransformInputCustomTransformationIgnored() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
+ final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+ final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/defaultrSpec.json")));
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
+ runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.DEFAULTR);
+ runner.enqueue(JSON_INPUT);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+ final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+ transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+ transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+ Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
+ Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/defaultrOutput.json")));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar
new file mode 100644
index 0000000..b738658
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar differ
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/customChainrSpec.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/customChainrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/customChainrSpec.json
new file mode 100644
index 0000000..eef0933
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoltTransformJson/customChainrSpec.json
@@ -0,0 +1,35 @@
+[
+ {
+ "operation":"TestCustomJoltTransform",
+ "spec" :
+ [
+ {
+ "operation": "shift",
+ "spec": {
+ "rating": {
+ "primary": {
+ "value": "Rating",
+ "max": "RatingRange"
+ },
+ "*": {
+ "max": "SecondaryRatings.&1.Range",
+ "value": "SecondaryRatings.&1.Value",
+ "$": "SecondaryRatings.&1.Id"
+ }
+ }
+ }
+ },
+ {
+ "operation": "default",
+ "spec": {
+ "Range": 5,
+ "SecondaryRatings": {
+ "*": {
+ "Range": 5
+ }
+ }
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/pom.xml
index f8a149f..85ce20c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/pom.xml
@@ -23,6 +23,11 @@ language governing permissions and limitations under the License. -->
<dependencies>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>com.bazaarvoice.jolt</groupId>
<artifactId>jolt-core</artifactId>
<version>0.0.20</version>
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/main/java/org/apache/nifi/processors/standard/util/TransformFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/main/java/org/apache/nifi/processors/standard/util/TransformFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/main/java/org/apache/nifi/processors/standard/util/TransformFactory.java
index 5bb8374..ef0e35c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/main/java/org/apache/nifi/processors/standard/util/TransformFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/main/java/org/apache/nifi/processors/standard/util/TransformFactory.java
@@ -17,30 +17,99 @@
package org.apache.nifi.processors.standard.util;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import com.bazaarvoice.jolt.CardinalityTransform;
import com.bazaarvoice.jolt.Chainr;
import com.bazaarvoice.jolt.Defaultr;
+import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.Removr;
import com.bazaarvoice.jolt.Shiftr;
import com.bazaarvoice.jolt.Sortr;
+import com.bazaarvoice.jolt.SpecDriven;
import com.bazaarvoice.jolt.Transform;
+import com.bazaarvoice.jolt.chainr.spec.ChainrEntry;
+import com.bazaarvoice.jolt.exception.SpecException;
public class TransformFactory {
- public static Transform getTransform(String transform, Object specJson) {
- if (transform.equals("jolt-transform-default")) {
+ public static Transform getTransform(final ClassLoader classLoader,final String transformType, final Object specJson) throws Exception {
+
+ if (transformType.equals("jolt-transform-default")) {
return new Defaultr(specJson);
- } else if (transform.equals("jolt-transform-shift")) {
+ } else if (transformType.equals("jolt-transform-shift")) {
return new Shiftr(specJson);
- } else if (transform.equals("jolt-transform-remove")) {
+ } else if (transformType.equals("jolt-transform-remove")) {
return new Removr(specJson);
- } else if (transform.equals("jolt-transform-card")) {
+ } else if (transformType.equals("jolt-transform-card")) {
return new CardinalityTransform(specJson);
- } else if(transform.equals("jolt-transform-sort")){
+ } else if(transformType.equals("jolt-transform-sort")){
return new Sortr();
+ } else{
+ return new Chainr(getChainrJoltTransformations(classLoader,specJson));
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Transform getCustomTransform(final ClassLoader classLoader, final String customTransformType, final Object specJson) throws Exception {
+ final Class clazz = classLoader.loadClass(customTransformType);
+ if(SpecDriven.class.isAssignableFrom(clazz)){
+ final Constructor constructor = clazz.getConstructor(Object.class);
+ return (Transform) constructor.newInstance(specJson);
+ }else{
+ return (Transform) clazz.newInstance();
+ }
+ }
+
+
+ protected static List<JoltTransform> getChainrJoltTransformations(ClassLoader classLoader, Object specJson) throws Exception{
+ if(!(specJson instanceof List)) {
+ throw new SpecException("JOLT Chainr expects a JSON array of objects - Malformed spec.");
} else {
- return Chainr.fromSpec(specJson);
+
+ List operations = (List)specJson;
+
+ if(operations.isEmpty()) {
+ throw new SpecException("JOLT Chainr passed an empty JSON array.");
+ } else {
+
+ ArrayList<JoltTransform> entries = new ArrayList<JoltTransform>(operations.size());
+
+ for(Object chainrEntryObj : operations) {
+
+ if(!(chainrEntryObj instanceof Map)) {
+ throw new SpecException("JOLT ChainrEntry expects a JSON map - Malformed spec");
+ } else {
+ Map chainrEntryMap = (Map)chainrEntryObj;
+ String opString = (String) chainrEntryMap.get("operation");
+ String operationClassName;
+
+ if(opString == null) {
+ throw new SpecException("JOLT Chainr \'operation\' must implement Transform or ContextualTransform");
+ } else {
+
+ if(ChainrEntry.STOCK_TRANSFORMS.containsKey(opString)) {
+ operationClassName = ChainrEntry.STOCK_TRANSFORMS.get(opString);
+ } else {
+ operationClassName = opString;
+ }
+
+ entries.add(getCustomTransform(classLoader,operationClassName,chainrEntryMap.get("spec")));
+ }
+ }
+ }
+
+ return entries;
+ }
}
+
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/java/org/apache/nifi/processors/standard/util/TestTransformFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/java/org/apache/nifi/processors/standard/util/TestTransformFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/java/org/apache/nifi/processors/standard/util/TestTransformFactory.java
index e2be189..01e9238 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/java/org/apache/nifi/processors/standard/util/TestTransformFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/java/org/apache/nifi/processors/standard/util/TestTransformFactory.java
@@ -17,8 +17,10 @@
package org.apache.nifi.processors.standard.util;
-import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
@@ -38,52 +40,76 @@ public class TestTransformFactory {
@Test
- public void testGetChainTransform() throws IOException{
+ public void testGetChainTransform() throws Exception{
final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
- Transform transform = TransformFactory.getTransform("jolt-transform-chain",JsonUtils.jsonToObject(chainrSpec));
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain",JsonUtils.jsonToObject(chainrSpec));
assertTrue(transform instanceof Chainr);
}
@Test
- public void testGetDefaultTransform() throws IOException{
+ public void testGetDefaultTransform() throws Exception{
final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/defaultrSpec.json")));
- Transform transform = TransformFactory.getTransform("jolt-transform-default",JsonUtils.jsonToObject(defaultrSpec));
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-default",JsonUtils.jsonToObject(defaultrSpec));
assertTrue(transform instanceof Defaultr);
}
@Test
- public void testGetSortTransform() throws IOException{
- Transform transform = TransformFactory.getTransform("jolt-transform-sort",null);
+ public void testGetSortTransform() throws Exception{
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-sort",null);
assertTrue(transform instanceof Sortr);
}
@Test
- public void testGetShiftTransform() throws IOException{
+ public void testGetShiftTransform() throws Exception{
final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/shiftrSpec.json")));
- Transform transform = TransformFactory.getTransform("jolt-transform-shift",JsonUtils.jsonToObject(shiftrSpec));
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-shift",JsonUtils.jsonToObject(shiftrSpec));
assertTrue(transform instanceof Shiftr);
}
@Test
- public void testGetRemoveTransform() throws IOException{
+ public void testGetRemoveTransform() throws Exception{
final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/removrSpec.json")));
- Transform transform = TransformFactory.getTransform("jolt-transform-remove",JsonUtils.jsonToObject(removrSpec));
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-remove",JsonUtils.jsonToObject(removrSpec));
assertTrue(transform instanceof Removr);
}
@Test
- public void testGetCardinalityTransform() throws IOException{
+ public void testGetCardinalityTransform() throws Exception{
final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/cardrSpec.json")));
- Transform transform = TransformFactory.getTransform("jolt-transform-card",JsonUtils.jsonToObject(cardrSpec));
+ Transform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-card",JsonUtils.jsonToObject(cardrSpec));
assertTrue(transform instanceof CardinalityTransform);
}
+ @Test
public void testGetInvalidTransformWithNoSpec() {
try{
- TransformFactory.getTransform("jolt-transform-chain",null);
+ TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain",null);
}catch (Exception e){
- assertTrue(e.toString().equals("JOLT Chainr expects a JSON array of objects - Malformed spec."));
+ assertTrue(e.getLocalizedMessage().equals("JOLT Chainr expects a JSON array of objects - Malformed spec."));
}
}
+ @Test
+ public void testGetCustomTransformation() throws Exception{
+ final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
+ Path jarFilePath = Paths.get("src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar");
+ URL[] urlPaths = new URL[1];
+ urlPaths[0] = jarFilePath.toUri().toURL();
+ ClassLoader customClassLoader = new URLClassLoader(urlPaths,this.getClass().getClassLoader());
+ Transform transform = TransformFactory.getCustomTransform(customClassLoader,"TestCustomJoltTransform",JsonUtils.jsonToObject(chainrSpec));
+ assertTrue(transform != null);
+ assertTrue(transform.getClass().getName().equals("TestCustomJoltTransform"));
+ }
+
+ @Test
+ public void testGetCustomTransformationNotFound() throws Exception{
+ final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json")));
+ try {
+ TransformFactory.getCustomTransform(this.getClass().getClassLoader(), "TestCustomJoltTransform", chainrSpec);
+ }catch (ClassNotFoundException cnf){
+ assertTrue(cnf.getLocalizedMessage().equals("TestCustomJoltTransform"));
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/048ba536/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar
new file mode 100644
index 0000000..b738658
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-utils/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar differ