You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/02/18 05:30:09 UTC
[07/29] incubator-nifi git commit: Providing validation of the input
FlowFile as JSON
Providing validation of the input FlowFile as JSON
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c3c4d369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c3c4d369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c3c4d369
Branch: refs/heads/NIFI-360
Commit: c3c4d36944fa8a773d042edefa8cfa0f58edcd07
Parents: 40da65f
Author: Aldrin Piri <al...@gmail.com>
Authored: Mon Feb 16 10:56:06 2015 -0500
Committer: Aldrin Piri <al...@gmail.com>
Committed: Mon Feb 16 10:56:06 2015 -0500
----------------------------------------------------------------------
.../processors/standard/EvaluateJsonPath.java | 55 +++++++++++++++++---
.../standard/TestEvaluateJsonPath.java | 15 +++++-
2 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3c4d369/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 4e1c6ba..41da277 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -16,9 +16,9 @@
*/
package org.apache.nifi.processors.standard;
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.InvalidPathException;
-import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.*;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import net.minidev.json.JSONValue;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -32,10 +32,11 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
+import java.io.InputStreamReader;
import java.util.*;
@EventDriven
@@ -63,6 +64,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
+ private static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
+
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -122,19 +125,56 @@ public class EvaluateJsonPath extends AbstractProcessor {
}
@Override
- public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+ public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
final FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
}
+
+ // Determine the destination
+
+ final String destination = processContext.getProperty(DESTINATION).getValue();
+
+ final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
+
+ // Parse the document once to support multiple path evaluations if specified
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
- // Parse the document once to support multiple path evaluations if specified
- Object document = Configuration.defaultConfiguration().jsonProvider().parse(in, StandardCharsets.UTF_8.displayName());
+ /*
+ * JSONValue#isValidJson is permissive to the degree of the Smart JSON definition.
+ * Accordingly, a strict JSON approach is preferred in determining whether or not a document is valid.
+ */
+ boolean validJson = JSONValue.isValidJsonStrict(new InputStreamReader(in));
+ if (validJson) {
+ DocumentContext ctx = JsonPath.parse(in);
+ contextHolder.set(ctx);
+ } else {
+ getLogger().error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile.getId()});
+ processSession.transfer(flowFile, REL_FAILURE);
+ }
}
});
+
+ DocumentContext documentContext = contextHolder.get();
+
+ if (documentContext == null) {
+ return;
+ }
+
+ try {
+ switch (destination) {
+ case DESTINATION_ATTRIBUTE:
+ break;
+ case DESTINATION_CONTENT:
+ break;
+ }
+ processSession.transfer(flowFile, REL_MATCH);
+ } catch (PathNotFoundException e) {
+ getLogger().warn("FlowFile {} could not be read from.", new Object[]{flowFile.getId()}, e);
+ processSession.transfer(flowFile, REL_NO_MATCH);
+ }
}
private static class JsonPathValidator implements Validator {
@@ -149,4 +189,5 @@ public class EvaluateJsonPath extends AbstractProcessor {
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3c4d369/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
index 9fb1130..a4e65cf 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@@ -27,6 +28,7 @@ import java.nio.file.Paths;
public class TestEvaluateJsonPath {
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
+ private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml");
@Test(expected = AssertionError.class)
public void testInvalidJsonPath() {
@@ -36,5 +38,16 @@ public class TestEvaluateJsonPath {
Assert.fail("An improper JsonPath expression was not detected as being invalid.");
}
-
+
+ @Test
+ public void testInvalidJsonDocument() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
+ testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
+
+ testRunner.enqueue(XML_SNIPPET);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(EvaluateJsonPath.REL_FAILURE, 1);
+ final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_FAILURE).get(0);
+ }
}