You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/02/18 05:30:21 UTC

[19/29] incubator-nifi git commit: Providing a SplitJson processor which will break JSON Arrays into their individual elements. Refactored supporting JsonUtils code and EvaluateJsonPath to reuse common functionality.

Providing a SplitJson processor which will break JSON Arrays into their individual elements.  Refactored supporting JsonUtils code and EvaluateJsonPath to reuse common functionality.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2e05dcbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2e05dcbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2e05dcbb

Branch: refs/heads/NIFI-360
Commit: 2e05dcbbfdb37843456dff121d9878d8679d7067
Parents: 59ad194
Author: Aldrin Piri <al...@gmail.com>
Authored: Tue Feb 17 15:15:53 2015 -0500
Committer: Aldrin Piri <al...@gmail.com>
Committed: Tue Feb 17 15:17:12 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   |   8 +-
 .../nifi/processors/standard/SplitJson.java     | 140 +++++++++++++++++++
 .../processors/standard/util/JsonUtils.java     |   8 ++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestEvaluateJsonPath.java          |   3 +-
 .../nifi/processors/standard/TestSplitJson.java | 115 +++++++++++++++
 6 files changed, 268 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 0bdd20c..8cdf1a2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -192,7 +192,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
                 final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
                 try {
                     Object result = documentContext.read(jsonPathExp);
-                    if (returnType.equals(RETURN_TYPE_SCALAR) && !isScalar(result)) {
+                    if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
                         logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
                                 new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
                         processSession.transfer(flowFile, REL_FAILURE);
@@ -233,15 +233,11 @@ public class EvaluateJsonPath extends AbstractProcessor {
     }
 
     private static String getResultRepresentation(Object jsonPathResult) {
-        if (isScalar(jsonPathResult)) {
+        if (JsonUtils.isJsonScalar(jsonPathResult)) {
             return jsonPathResult.toString();
         }
         return JsonUtils.JSON_PROVIDER.toJson(jsonPathResult);
     }
 
-    private static boolean isScalar(Object obj) {
-        return (obj instanceof String);
-    }
-
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
new file mode 100644
index 0000000..e589b48
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.standard.util.JsonUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "split", "jsonpath"})
+@CapabilityDescription("Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. "
+        + "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split, "
+        + "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
+        + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
+public class SplitJson extends AbstractProcessor {
+
+    public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
+            .name("JsonPath Expression")
+            .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
+            .required(true)
+            .addValidator(JsonUtils.JSON_PATH_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
+    public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified path does not exist), it will be routed to this relationship").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(ARRAY_JSON_PATH_EXPRESSION);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_SPLIT);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
+        final FlowFile original = processSession.get();
+        if (original == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+
+
+        final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original);
+
+        if (documentContext == null) {
+            logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
+            processSession.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
+        final JsonPath jsonPath = JsonPath.compile(jsonPathExpression);
+
+        final List<FlowFile> segments = new ArrayList<>();
+
+        Object jsonPathResult = documentContext.read(jsonPath);
+
+        if (!(jsonPathResult instanceof List)) {
+            logger.error("The evaluated value {} of {} was not an array compatible type and cannot be split.",
+                    new Object[]{jsonPathResult, jsonPath.getPath()});
+            processSession.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        List resultList = (List) jsonPathResult;
+
+        for (final Object resultSegment : resultList) {
+            FlowFile split = processSession.create(original);
+            split = processSession.write(split, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    String resultSegmentContent;
+                    if (JsonUtils.isJsonScalar(resultSegment)) {
+                        resultSegmentContent = resultSegment.toString();
+                    } else {
+                        resultSegmentContent = JsonUtils.JSON_PROVIDER.toJson(resultSegment);
+                    }
+                    out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
+                }
+            });
+            segments.add(split);
+        }
+
+        processSession.transfer(segments, REL_SPLIT);
+        processSession.transfer(original, REL_ORIGINAL);
+        logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
index 6eb567e..0bf33dd 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
@@ -35,6 +35,8 @@ import org.apache.nifi.util.ObjectHolder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Provides utilities for interacting with JSON elements and JsonPath expressions and results
@@ -101,4 +103,10 @@ public class JsonUtils {
 
         return isValid;
     }
+
+    public static boolean isJsonScalar(Object obj) {
+        // For the default provider, a Map or List is able to be handled as a JSON entity
+        return !(obj instanceof Map || obj instanceof List);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 8a1fd74..17b5364 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -55,6 +55,7 @@ org.apache.nifi.processors.standard.ScanAttribute
 org.apache.nifi.processors.standard.ScanContent
 org.apache.nifi.processors.standard.SegmentContent
 org.apache.nifi.processors.standard.SplitContent
+org.apache.nifi.processors.standard.SplitJson
 org.apache.nifi.processors.standard.SplitText
 org.apache.nifi.processors.standard.SplitXml
 org.apache.nifi.processors.standard.TransformXml

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
index c873969..b7b5103 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -158,7 +159,7 @@ public class TestEvaluateJsonPath {
         testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
         final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
         Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey));
-        Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", "", out.getAttribute(jsonPathNameAttrKey));
+        Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", StringUtils.EMPTY, out.getAttribute(jsonPathNameAttrKey));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
new file mode 100644
index 0000000..dd6fc6d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class TestSplitJson {
+
+    private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
+    private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
+
+    @Test(expected = AssertionError.class)
+    public void testInvalidJsonPath() {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$..");
+
+        Assert.fail("An improper JsonPath expression was not detected as being invalid.");
+    }
+
+    @Test
+    public void testInvalidJsonDocument() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$");
+
+        testRunner.enqueue(XML_SNIPPET);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(SplitJson.REL_FAILURE, 1);
+        final MockFlowFile out = testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0);
+        // Verify that the content was unchanged
+        out.assertContentEquals(XML_SNIPPET);
+    }
+
+    @Test
+    public void testSplit_nonArrayResult() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0]._id");
+
+        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.run();
+
+        Relationship expectedRel = SplitJson.REL_FAILURE;
+
+        testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
+        final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0);
+        out.assertContentEquals(JSON_SNIPPET);
+    }
+
+    @Test
+    public void testSplit_arrayResult_oneValue() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range[?(@ == 0)]");
+
+        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.run();
+
+        testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1);
+        testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
+        testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0");
+    }
+
+    @Test
+    public void testSplit_arrayResult_multipleValues() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range");
+
+        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.run();
+
+        int numSplitsExpected = 10;
+
+        testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected);
+        final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
+        originalOut.assertContentEquals(JSON_SNIPPET);
+    }
+
+    @Test
+    public void testSplit_arrayResult_nonScalarValues() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
+        testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name");
+
+        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.run();
+
+        testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
+        testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
+        testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
+    }
+
+}