You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/02/18 05:30:17 UTC

[15/29] incubator-nifi git commit: Refining logic of how errors are handled on a per destination basis. Adding supporting tests to ensure contract is met.

Refining logic of how errors are handled on a per destination basis.  Adding supporting tests to ensure contract is met.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bcebba66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bcebba66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bcebba66

Branch: refs/heads/NIFI-360
Commit: bcebba6632165b6d69eb88ad12895e7678079267
Parents: 5b145e1
Author: Aldrin Piri <al...@gmail.com>
Authored: Mon Feb 16 22:44:06 2015 -0500
Committer: Aldrin Piri <al...@gmail.com>
Committed: Mon Feb 16 22:44:06 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   | 62 ++++++++++++++------
 .../standard/TestEvaluateJsonPath.java          | 20 ++++++-
 2 files changed, 61 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bcebba66/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index d6bcdc1..b82764d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -50,15 +50,23 @@ import java.util.*;
 @SideEffectFree
 @SupportsBatching
 @Tags({"JSON", "evaluate", "JsonPath"})
-@CapabilityDescription("")
+@CapabilityDescription("Evaluates one or more JsonPath expressions against the content of a FlowFile. The results of those expressions are assigned to "
+        + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the "
+        + "Processor. JsonPaths are entered by adding user-defined properties; the name of the property maps to the Attribute "
+        + "Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). "
+        + "The value of the property must be a valid JsonPath expression. If the JsonPath evaluates to a JSON array or JSON object and the Return Type is "
+        + "set to 'scalar' the FlowFile will be unmodified and will be routed to failure. If the JsonPath does not "
+        + "evaluate to a scalar, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is "
+        + "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the "
+        + "FlowFile will always be routed to 'matched.'  If Destination is 'flowfile-content' and the expression matches nothing, "
+        + "the FlowFile will be routed to 'unmatched' without having its contents modified.")
 public class EvaluateJsonPath extends AbstractProcessor {
 
     public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
     public static final String DESTINATION_CONTENT = "flowfile-content";
 
-    public static final String RETURN_TYPE_AUTO = "auto-detect";
     public static final String RETURN_TYPE_JSON = "json";
-    public static final String RETURN_TYPE_STRING = "string";
+    public static final String RETURN_TYPE_SCALAR = "scalar";
 
     public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
             .name("Destination")
@@ -72,8 +80,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
             .name("Return Type")
             .description("Indicates the desired return type of the JSON Path expressions.  Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
             .required(true)
-            .allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_JSON, RETURN_TYPE_STRING)
-            .defaultValue(RETURN_TYPE_AUTO)
+            .allowableValues(RETURN_TYPE_JSON, RETURN_TYPE_SCALAR)
+            .defaultValue(RETURN_TYPE_JSON)
             .build();
 
     public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result").build();
@@ -208,30 +216,44 @@ public class EvaluateJsonPath extends AbstractProcessor {
             final Map<String, String> jsonPathResults = new HashMap<>();
 
             // Iterate through all JsonPath entries specified
+            jsonPathEvalLoop:
             for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
 
                 String jsonPathAttrKey = attributeJsonPathEntry.getKey();
                 JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
 
 
-                final ObjectHolder<String> resultHolder = new ObjectHolder<>("");
+                final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
                 try {
-                    resultHolder.set(evaluatePathForContext(jsonPathExp, documentContext));
+                    Object result = documentContext.read(jsonPathExp);
+                    if (RETURN_TYPE.getName().equals(RETURN_TYPE_SCALAR) && !isScalar(result)) {
+                        logger.error("Unable to return a scalar value for a JsonPath {} for FlowFile {}. Transferring to {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), REL_FAILURE.getName()});
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        continue flowFileLoop;
+                    }
+                    resultHolder.set(result);
                 } catch (PathNotFoundException e) {
-                    logger.error("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
-                    jsonPathResults.put(jsonPathAttrKey, "");
+                    logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
+                    if (destination.equals(DESTINATION_ATTRIBUTE)) {
+                        jsonPathResults.put(jsonPathAttrKey, "");
+                        continue jsonPathEvalLoop;
+                    } else {
+                        processSession.transfer(flowFile, REL_NO_MATCH);
+                        continue flowFileLoop;
+                    }
                 }
 
+                final String resultRepresentation = getResultRepresentation(resultHolder.get());
                 switch (destination) {
                     case DESTINATION_ATTRIBUTE:
-                        jsonPathResults.put(jsonPathAttrKey, resultHolder.get());
+                        jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
                         break;
                     case DESTINATION_CONTENT:
                         flowFile = processSession.write(flowFile, new OutputStreamCallback() {
                             @Override
                             public void process(final OutputStream out) throws IOException {
                                 try (OutputStream outputStream = new BufferedOutputStream(out)) {
-                                    outputStream.write(resultHolder.get().getBytes(StandardCharsets.UTF_8));
+                                    outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
                                 }
                             }
                         });
@@ -243,16 +265,18 @@ public class EvaluateJsonPath extends AbstractProcessor {
         }
     }
 
-    private static String evaluatePathForContext(JsonPath path, ReadContext readCtx) {
-        Object pathResult = readCtx.read(path);
+    private static String getResultRepresentation(Object jsonPathResult) {
+        if (isScalar(jsonPathResult)) {
+            return jsonPathResult.toString();
+        }
+        return JSON_PROVIDER.toJson(jsonPathResult);
+    }
+
+    private static boolean isScalar(Object obj) {
         /*
-         *  A given path could be a JSON object or a single value, if a sole value, treat as a String; otherwise, return the
-         *  representative JSON.
+         *  A given path could be a JSON object or a single/scalar value
          */
-        if (pathResult instanceof String) {
-            return pathResult.toString();
-        }
-        return JSON_PROVIDER.toJson(pathResult);
+        return (obj instanceof String);
     }
 
     private static class JsonPathValidator implements Validator {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bcebba66/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
index bece3da..e72d069 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java
@@ -180,7 +180,7 @@ public class TestEvaluateJsonPath {
 
 
     @Test
-    public void testExtractPath_destinationAttribute_indefiniteResult() throws Exception {
+    public void testExtractPath_destinationContent_indefiniteResult() throws Exception {
         String jsonPathAttrKey = "friends.indefinite.id.list";
 
         final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
@@ -197,7 +197,7 @@ public class TestEvaluateJsonPath {
     }
 
     @Test
-    public void testExtractPath_destinationAttribute_indefiniteResult_operators() throws Exception {
+    public void testExtractPath_destinationContent_indefiniteResult_operators() throws Exception {
         String jsonPathAttrKey = "friends.indefinite.id.list";
 
         final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
@@ -212,4 +212,20 @@ public class TestEvaluateJsonPath {
         testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
         testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals("[0,1,2]");
     }
+
+    @Test
+    public void testRouteUnmatched_destinationContent_noMatch() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
+        testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_CONTENT);
+        testRunner.setProperty("jsonPath", "$[0].nonexistent.key");
+
+        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.run();
+
+        Relationship expectedRel = EvaluateJsonPath.REL_NO_MATCH;
+
+        testRunner.assertAllFlowFilesTransferred(expectedRel, 1);
+        testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
+    }
+
 }