You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2016/12/22 06:23:40 UTC

nifi git commit: NIFI-3236 - SplitJson performance improvements

Repository: nifi
Updated Branches:
  refs/heads/master 428515767 -> 44c9ea0a6


NIFI-3236 - SplitJson performance improvements

This closes #1347.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/44c9ea0a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/44c9ea0a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/44c9ea0a

Branch: refs/heads/master
Commit: 44c9ea0a660b5d3ee9551307a6b83249f4e8491c
Parents: 4285157
Author: Bryan Rosander <br...@apache.org>
Authored: Tue Dec 20 16:39:22 2016 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Dec 22 15:22:53 2016 +0900

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitJson.java     | 39 ++++++++++----------
 1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/44c9ea0a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index caa5a0b..003834e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -20,11 +20,12 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -101,6 +103,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
     private Set<Relationship> relationships;
 
     private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
+    private volatile String nullDefaultValue;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -156,6 +159,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
         return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext));
     }
 
+    @OnScheduled
+    public void onScheduled(ProcessContext processContext) {
+        nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
+    }
+
     @Override
     public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
         final FlowFile original = processSession.get();
@@ -165,7 +173,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
 
         final ComponentLog logger = getLogger();
 
-        DocumentContext documentContext = null;
+        DocumentContext documentContext;
         try {
             documentContext = validateAndEstablishJsonContext(processSession, original);
         } catch (InvalidJsonException e) {
@@ -175,10 +183,6 @@ public class SplitJson extends AbstractJsonPathProcessor {
         }
 
         final JsonPath jsonPath = JSON_PATH_REF.get();
-        String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
-        final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
-
-        final List<FlowFile> segments = new ArrayList<>();
 
         Object jsonPathResult;
         try {
@@ -196,9 +200,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
         }
 
         List resultList = (List) jsonPathResult;
-        AtomicInteger jsonLineCount = new AtomicInteger(0);
 
-        final String fragmentIdentifier = UUID.randomUUID().toString();
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("fragment.identifier", UUID.randomUUID().toString());
+        attributes.put("fragment.count", Integer.toString(resultList.size()));
+
         for (int i = 0; i < resultList.size(); i++) {
             Object resultSegment = resultList.get(i);
             FlowFile split = processSession.create(original);
@@ -207,19 +213,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
                         out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
                     }
             );
-            split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier);
-            split = processSession.putAttribute(split, "fragment.index", Integer.toString(i));
-            split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
-            segments.add(split);
-            jsonLineCount.incrementAndGet();
+            attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
+            attributes.put("fragment.index", Integer.toString(i));
+            processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
         }
 
-        segments.forEach((segment) -> {
-            segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get()));
-            processSession.transfer(segment, REL_SPLIT);
-        });
-
         processSession.transfer(original, REL_ORIGINAL);
-        logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
+        logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
     }
-}
+}
\ No newline at end of file