You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2016/12/22 06:23:40 UTC
nifi git commit: NIFI-3236 - SplitJson performance improvements
Repository: nifi
Updated Branches:
refs/heads/master 428515767 -> 44c9ea0a6
NIFI-3236 - SplitJson performance improvements
This closes #1347.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/44c9ea0a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/44c9ea0a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/44c9ea0a
Branch: refs/heads/master
Commit: 44c9ea0a660b5d3ee9551307a6b83249f4e8491c
Parents: 4285157
Author: Bryan Rosander <br...@apache.org>
Authored: Tue Dec 20 16:39:22 2016 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Dec 22 15:22:53 2016 +0900
----------------------------------------------------------------------
.../nifi/processors/standard/SplitJson.java | 39 ++++++++++----------
1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/44c9ea0a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index caa5a0b..003834e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -20,11 +20,12 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -101,6 +103,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
private Set<Relationship> relationships;
private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
+ private volatile String nullDefaultValue;
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -156,6 +159,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext));
}
+ @OnScheduled
+ public void onScheduled(ProcessContext processContext) {
+ nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
+ }
+
@Override
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
final FlowFile original = processSession.get();
@@ -165,7 +173,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
final ComponentLog logger = getLogger();
- DocumentContext documentContext = null;
+ DocumentContext documentContext;
try {
documentContext = validateAndEstablishJsonContext(processSession, original);
} catch (InvalidJsonException e) {
@@ -175,10 +183,6 @@ public class SplitJson extends AbstractJsonPathProcessor {
}
final JsonPath jsonPath = JSON_PATH_REF.get();
- String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
- final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
-
- final List<FlowFile> segments = new ArrayList<>();
Object jsonPathResult;
try {
@@ -196,9 +200,11 @@ public class SplitJson extends AbstractJsonPathProcessor {
}
List resultList = (List) jsonPathResult;
- AtomicInteger jsonLineCount = new AtomicInteger(0);
- final String fragmentIdentifier = UUID.randomUUID().toString();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("fragment.identifier", UUID.randomUUID().toString());
+ attributes.put("fragment.count", Integer.toString(resultList.size()));
+
for (int i = 0; i < resultList.size(); i++) {
Object resultSegment = resultList.get(i);
FlowFile split = processSession.create(original);
@@ -207,19 +213,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
);
- split = processSession.putAttribute(split, "fragment.identifier", fragmentIdentifier);
- split = processSession.putAttribute(split, "fragment.index", Integer.toString(i));
- split = processSession.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
- segments.add(split);
- jsonLineCount.incrementAndGet();
+ attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
+ attributes.put("fragment.index", Integer.toString(i));
+ processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
}
- segments.forEach((segment) -> {
- segment = processSession.putAttribute(segment, "fragment.count", Integer.toString(jsonLineCount.get()));
- processSession.transfer(segment, REL_SPLIT);
- });
-
processSession.transfer(original, REL_ORIGINAL);
- logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()});
+ logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
}
-}
+}
\ No newline at end of file