You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/02/18 05:30:09 UTC

[07/29] incubator-nifi git commit: Providing validation of the input FlowFile as JSON

Providing validation of the input FlowFile as JSON


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c3c4d369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c3c4d369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c3c4d369

Branch: refs/heads/NIFI-360
Commit: c3c4d36944fa8a773d042edefa8cfa0f58edcd07
Parents: 40da65f
Author: Aldrin Piri <al...@gmail.com>
Authored: Mon Feb 16 10:56:06 2015 -0500
Committer: Aldrin Piri <al...@gmail.com>
Committed: Mon Feb 16 10:56:06 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   | 55 +++++++++++++++++---
 .../standard/TestEvaluateJsonPath.java          | 15 +++++-
 2 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3c4d369/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 4e1c6ba..41da277 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -16,9 +16,9 @@
  */
 package org.apache.nifi.processors.standard;
 
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.InvalidPathException;
-import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.*;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import net.minidev.json.JSONValue;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -32,10 +32,11 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.*;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.util.ObjectHolder;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
+import java.io.InputStreamReader;
 import java.util.*;
 
 @EventDriven
@@ -63,6 +64,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
+    private static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
+
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -122,19 +125,56 @@ public class EvaluateJsonPath extends AbstractProcessor {
     }
 
     @Override
-    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+    public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
 
         final FlowFile flowFile = processSession.get();
         if (flowFile == null) {
             return;
         }
+
+        // Determine the destination
+
+        final String destination = processContext.getProperty(DESTINATION).getValue();
+
+        final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
+
+        // Parse the document once to support multiple path evaluations if specified
         processSession.read(flowFile, new InputStreamCallback() {
             @Override
             public void process(InputStream in) throws IOException {
-                // Parse the document once to support multiple path evaluations if specified
-                Object document = Configuration.defaultConfiguration().jsonProvider().parse(in, StandardCharsets.UTF_8.displayName());
+                /*
+                 * JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
+                 * Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
+                 */
+                boolean validJson = JSONValue.isValidJsonStrict(new InputStreamReader(in));
+                if (validJson) {
+                    DocumentContext ctx = JsonPath.parse(in);
+                    contextHolder.set(ctx);
+                } else {
+                    getLogger().error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
+                    processSession.transfer(flowFile, REL_FAILURE);
+                }
             }
         });
+
+        DocumentContext documentContext = contextHolder.get();
+
+        if (documentContext == null) {
+            return;
+        }
+
+        try {
+            switch (destination) {
+                case DESTINATION_ATTRIBUTE:
+                    break;
+                case DESTINATION_CONTENT:
+                    break;
+            }
+            processSession.transfer(flowFile, REL_MATCH);
+        } catch (PathNotFoundException e) {
+            getLogger().warn("FlowFile {} could not be read from.", new Object[]{flowFile.getId()}, e);
+            processSession.transfer(flowFile, REL_NO_MATCH);
+        }
     }
 
     private static class JsonPathValidator implements Validator {
@@ -149,4 +189,5 @@ public class EvaluateJsonPath extends AbstractProcessor {
             return new ValidationResult.Builder().valid(error == null).explanation(error).build();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3c4d369/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
index 9fb1130..a4e65cf 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -27,6 +28,7 @@ import java.nio.file.Paths;
 public class TestEvaluateJsonPath {
 
     private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
+    private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
 
     @Test(expected = AssertionError.class)
     public void testInvalidJsonPath() {
@@ -36,5 +38,16 @@ public class TestEvaluateJsonPath {
 
         Assert.fail("An improper JsonPath expression was not detected as being invalid.");
     }
-    
+
+    @Test
+    public void testInvalidJsonDocument() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
+        testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
+
+        testRunner.enqueue(XML_SNIPPET);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EvaluateJsonPath.REL_FAILURE, 1);
+        final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_FAILURE).get(0);
+    }
 }