You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/12/21 21:29:52 UTC

nifi git commit: NIFI-3235 - EvaluateJsonPath performance improvements

Repository: nifi
Updated Branches:
  refs/heads/master 721c9ee7f -> 428515767


NIFI-3235 - EvaluateJsonPath performance improvements

This closes #1346.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/42851576
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/42851576
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/42851576

Branch: refs/heads/master
Commit: 42851576759fdba9de19b24f1ca102cf53cbc4a0
Parents: 721c9ee
Author: Bryan Rosander <br...@apache.org>
Authored: Tue Dec 20 12:29:15 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Dec 21 16:29:23 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   | 156 ++++++++++---------
 1 file changed, 85 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/42851576/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 813a07d..cddfceb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
+import java.io.BufferedOutputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -26,8 +26,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
@@ -40,6 +42,8 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -50,14 +54,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 
 import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
-import java.util.concurrent.atomic.AtomicReference;
+
+import java.util.stream.Collectors;
 
 @EventDriven
 @SideEffectFree
@@ -140,6 +143,13 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
 
     private final ConcurrentMap<String, JsonPath> cachedJsonPathMap = new ConcurrentHashMap<>();
 
+    private final Queue<Set<Map.Entry<String, JsonPath>>> attributeToJsonPathEntrySetQueue = new ConcurrentLinkedQueue<>();
+    private volatile String representationOption;
+    private volatile boolean destinationIsAttribute;
+    private volatile String returnType;
+    private volatile String pathNotFound;
+    private volatile String nullDefaultValue;
+
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final Set<Relationship> relationships = new HashSet<>();
@@ -230,9 +240,25 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
         }
     }
 
+    @OnScheduled
+    public void onScheduled(ProcessContext processContext) {
+        representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
+        destinationIsAttribute = DESTINATION_ATTRIBUTE.equals(processContext.getProperty(DESTINATION).getValue());
+        returnType = processContext.getProperty(RETURN_TYPE).getValue();
+        if (returnType.equals(RETURN_TYPE_AUTO)) {
+            returnType = destinationIsAttribute ? RETURN_TYPE_SCALAR : RETURN_TYPE_JSON;
+        }
+        pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
+        nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        attributeToJsonPathEntrySetQueue.clear();
+    }
+
     @Override
     public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
-
         FlowFile flowFile = processSession.get();
         if (flowFile == null) {
             return;
@@ -240,27 +266,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
 
         final ComponentLog logger = getLogger();
 
-        String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
-        final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
-
-        /* Build the JsonPath expressions from attributes */
-        final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
-
-        for (final Map.Entry<PropertyDescriptor, String> entry : processContext.getProperties().entrySet()) {
-            if (!entry.getKey().isDynamic()) {
-                continue;
-            }
-            final JsonPath jsonPath = JsonPath.compile(entry.getValue());
-            attributeToJsonPathMap.put(entry.getKey().getName(), jsonPath);
-        }
-
-        final String destination = processContext.getProperty(DESTINATION).getValue();
-        String returnType = processContext.getProperty(RETURN_TYPE).getValue();
-        if (returnType.equals(RETURN_TYPE_AUTO)) {
-            returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
-        }
-
-        DocumentContext documentContext = null;
+        DocumentContext documentContext;
         try {
             documentContext = validateAndEstablishJsonContext(processSession, flowFile);
         } catch (InvalidJsonException e) {
@@ -269,59 +275,67 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
             return;
         }
 
-        final Map<String, String> jsonPathResults = new HashMap<>();
-
-        for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
-
-            final String jsonPathAttrKey = attributeJsonPathEntry.getKey();
-            final JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
-            final String pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
-
-            final AtomicReference<Object> resultHolder = new AtomicReference<>(null);
-            try {
-                final Object result = documentContext.read(jsonPathExp);
-                if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) {
-                    logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
-                            new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
-                    processSession.transfer(flowFile, REL_FAILURE);
-                    return;
-                }
-                resultHolder.set(result);
-            } catch (PathNotFoundException e) {
-
-                if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
-                    logger.warn("FlowFile {} could not find path {} for attribute key {}.",
-                            new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
-                }
+        Set<Map.Entry<String, JsonPath>> attributeJsonPathEntries = attributeToJsonPathEntrySetQueue.poll();
+        if (attributeJsonPathEntries == null) {
+            attributeJsonPathEntries = processContext.getProperties().entrySet().stream()
+                    .filter(e -> e.getKey().isDynamic())
+                    .collect(Collectors.toMap(e -> e.getKey().getName(), e -> JsonPath.compile(e.getValue())))
+                    .entrySet();
+        }
 
-                if (destination.equals(DESTINATION_ATTRIBUTE)) {
-                    jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
-                    continue;
-                } else {
-                    processSession.transfer(flowFile, REL_NO_MATCH);
-                    return;
+        try {
+            // We'll only be using this map if destinationIsAttribute == true
+            final Map<String, String> jsonPathResults = destinationIsAttribute ? new HashMap<>(attributeJsonPathEntries.size()) : Collections.EMPTY_MAP;
+
+            for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeJsonPathEntries) {
+                final String jsonPathAttrKey = attributeJsonPathEntry.getKey();
+                final JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
+
+                Object result;
+                try {
+                    Object potentialResult = documentContext.read(jsonPathExp);
+                    if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(potentialResult)) {
+                        logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
+                                new Object[]{jsonPathExp.getPath(), flowFile.getId(), potentialResult.toString(), REL_FAILURE.getName()});
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                    result = potentialResult;
+                } catch (PathNotFoundException e) {
+                    if (pathNotFound.equals(PATH_NOT_FOUND_WARN)) {
+                        logger.warn("FlowFile {} could not find path {} for attribute key {}.",
+                                new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
+                    }
+
+                    if (destinationIsAttribute) {
+                        jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
+                        continue;
+                    } else {
+                        processSession.transfer(flowFile, REL_NO_MATCH);
+                        return;
+                    }
                 }
-            }
 
-            final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue);
-            switch (destination) {
-                case DESTINATION_ATTRIBUTE:
+                final String resultRepresentation = getResultRepresentation(result, nullDefaultValue);
+                if (destinationIsAttribute) {
                     jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
-                    break;
-                case DESTINATION_CONTENT:
-                    flowFile = processSession.write(flowFile, new OutputStreamCallback() {
-                        @Override
-                        public void process(final OutputStream out) throws IOException {
-                            try (OutputStream outputStream = new BufferedOutputStream(out)) {
-                                outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
-                            }
+                } else {
+                    flowFile = processSession.write(flowFile, out -> {
+                        try (OutputStream outputStream = new BufferedOutputStream(out)) {
+                            outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
                         }
                     });
                     processSession.getProvenanceReporter().modifyContent(flowFile, "Replaced content with result of expression " + jsonPathExp.getPath());
-                    break;
+                }
+            }
+
+            // jsonPathResults map will be empty if this is false
+            if (destinationIsAttribute) {
+                flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
             }
+            processSession.transfer(flowFile, REL_MATCH);
+        } finally {
+            attributeToJsonPathEntrySetQueue.offer(attributeJsonPathEntries);
         }
-        flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
-        processSession.transfer(flowFile, REL_MATCH);
     }
 }