You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/03 15:20:42 UTC
nifi git commit: NIFI-3240 - AttributesToJson performance improvements
Repository: nifi
Updated Branches:
refs/heads/master be6bcf20a -> 1b4729e44
NIFI-3240 - AttributesToJson performance improvements
This closes #1352.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1b4729e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1b4729e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1b4729e4
Branch: refs/heads/master
Commit: 1b4729e448394177cddeccb700f9215d6ec7b570
Parents: be6bcf2
Author: Bryan Rosander <br...@apache.org>
Authored: Wed Dec 21 14:19:57 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 3 10:20:16 2017 -0500
----------------------------------------------------------------------
.../processors/standard/AttributesToJSON.java | 128 +++++++++----------
.../standard/TestAttributesToJSON.java | 111 +++++++++++++++-
2 files changed, 171 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1b4729e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 78b8d58..cfa4cfe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -36,20 +37,19 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.BufferedOutputStream;
import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Map;
-import java.util.HashMap;
import java.util.Collections;
+import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
- private static final String APPLICATION_JSON = "application/json";
+ public static final String APPLICATION_JSON = "application/json";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
@@ -116,6 +116,10 @@ public class AttributesToJSON extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private volatile Set<String> attributesToRemove;
+ private volatile Set<String> attributes;
+ private volatile Boolean nullValueForEmptyString;
+ private volatile boolean destinationContent;
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -149,55 +153,57 @@ public class AttributesToJSON extends AbstractProcessor {
* @return
* Map of values that are feed to a Jackson ObjectMapper
*/
- protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
- boolean includeCoreAttributes,
- boolean nullValForEmptyString) {
-
- Map<String, String> atsToWrite = new HashMap<>();
+ protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove, boolean nullValForEmptyString) {
+ Map<String, String> result;
+ //If list of attributes specified get only those attributes. Otherwise write them all
+ if (attributes != null) {
+ result = new HashMap<>(attributes.size());
+ for (String attribute : attributes) {
+ String val = ff.getAttribute(attribute);
+ if (val != null || nullValForEmptyString) {
+ result.put(attribute, val);
+ } else {
+ result.put(attribute, "");
+ }
+ }
+ } else {
+ Map<String, String> ffAttributes = ff.getAttributes();
+ result = new HashMap<>(ffAttributes.size());
+ for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
+ if (!attributesToRemove.contains(e.getKey())) {
+ result.put(e.getKey(), e.getValue());
+ }
+ }
+ }
+ return result;
+ }
+ private Set<String> buildAtrs(String atrList, Set<String> atrsToExclude) {
//If list of attributes specified get only those attributes. Otherwise write them all
if (StringUtils.isNotBlank(atrList)) {
String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
if (ats != null) {
+ Set<String> result = new HashSet<>(ats.length);
for (String str : ats) {
- String cleanStr = str.trim();
- String val = ff.getAttribute(cleanStr);
- if (val != null) {
- atsToWrite.put(cleanStr, val);
- } else {
- if (nullValForEmptyString) {
- atsToWrite.put(cleanStr, null);
- } else {
- atsToWrite.put(cleanStr, "");
- }
+ String trim = str.trim();
+ if (!atrsToExclude.contains(trim)) {
+ result.add(trim);
}
}
+ return result;
}
- } else {
- atsToWrite.putAll(ff.getAttributes());
}
-
- if (!includeCoreAttributes) {
- atsToWrite = removeCoreAttributes(atsToWrite);
- }
-
- return atsToWrite;
+ return null;
}
- /**
- * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile.
- *
- * @param atsToWrite
- * List of Attributes that have already been generated including the CoreAttributes
- *
- * @return
- * Difference of all attributes minus the CoreAttributes
- */
- protected Map<String, String> removeCoreAttributes(Map<String, String> atsToWrite) {
- for (CoreAttributes c : CoreAttributes.values()) {
- atsToWrite.remove(c.key());
- }
- return atsToWrite;
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
+ .map(CoreAttributes::key)
+ .collect(Collectors.toSet());
+ attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
+ nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
+ destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
}
@Override
@@ -207,33 +213,21 @@ public class AttributesToJSON extends AbstractProcessor {
return;
}
- final Map<String, String> atrList = buildAttributesMapForFlowFile(original,
- context.getProperty(ATTRIBUTES_LIST).getValue(),
- context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
- context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
+ final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString);
try {
-
- switch (context.getProperty(DESTINATION).getValue()) {
- case DESTINATION_ATTRIBUTE:
- FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
- objectMapper.writeValueAsString(atrList));
- session.transfer(atFlowfile, REL_SUCCESS);
- break;
- case DESTINATION_CONTENT:
- FlowFile conFlowfile = session.write(original, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (OutputStream outputStream = new BufferedOutputStream(out)) {
- outputStream.write(objectMapper.writeValueAsBytes(atrList));
- }
- }
- });
- conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
- session.transfer(conFlowfile, REL_SUCCESS);
- break;
+ if (destinationContent) {
+ FlowFile conFlowfile = session.write(original, (in, out) -> {
+ try (OutputStream outputStream = new BufferedOutputStream(out)) {
+ outputStream.write(objectMapper.writeValueAsBytes(atrList));
+ }
+ });
+ conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+ session.transfer(conFlowfile, REL_SUCCESS);
+ } else {
+ FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList));
+ session.transfer(atFlowfile, REL_SUCCESS);
}
-
} catch (JsonProcessingException e) {
getLogger().error(e.getMessage());
session.transfer(original, REL_FAILURE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/1b4729e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
index 0f9ec26..5c8df9b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -19,13 +19,20 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -193,7 +200,6 @@ public class TestAttributesToJSON {
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
}
-
@Test
public void testAttribute_singleUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
@@ -279,4 +285,107 @@ public class TestAttributesToJSON {
assertTrue(val.size() == 1);
}
+ @Test
+ public void testAttribute_noIncludeCoreAttributesUserDefined() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+ testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " , " + CoreAttributes.PATH.key() + " ");
+ testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
+
+ ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+ FlowFile ff = session.create();
+ ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+ ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE);
+
+ testRunner.enqueue(ff);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+ .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+ testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+ String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+ .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> val = mapper.readValue(json, HashMap.class);
+ assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
+ assertEquals(1, val.size());
+ }
+
+ @Test
+ public void testAttribute_noIncludeCoreAttributesContent() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+ testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
+
+ ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+ FlowFile ff = session.create();
+ ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+ ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE);
+
+ testRunner.enqueue(ff);
+ testRunner.run();
+
+ testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> val = mapper.readValue(testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).toByteArray(), HashMap.class);
+ assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
+ assertEquals(1, val.size());
+ }
+
+ @Test
+ public void testAttribute_includeCoreAttributesContent() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+ testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
+ testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true");
+
+ ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+ FlowFile ff = session.create();
+
+ testRunner.enqueue(ff);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertEquals(AttributesToJSON.APPLICATION_JSON, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ Map<String, String> val = new ObjectMapper().readValue(flowFile.toByteArray(), HashMap.class);
+ assertEquals(3, val.size());
+ Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+ val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
+ }
+
+ @Test
+ public void testAttribute_includeCoreAttributesAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+ testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true");
+
+ ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+ FlowFile ff = session.create();
+
+ testRunner.enqueue(ff);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ Map<String, String> val = new ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME), HashMap.class);
+ assertEquals(3, val.size());
+ Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+ val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
+ }
}