You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/09 05:26:38 UTC

[04/50] [abbrv] nifi git commit: NIFI-1079 Added Destination Property to control if JSON goes to Attribute or Content of FlowFile. Added Include Core Attributes Property to control if FlowFile CoreAttributes are included in the JSON output or not. Added

NIFI-1079 Added Destination Property to control if JSON goes to Attribute or
Content of FlowFile. Added Include Core Attributes Property to control
if FlowFile CoreAttributes are included in the JSON output or not.
Added Null value for Empty String flag to control if empty values in
the JSON are empty string or true NULL values. Added more tests and
minor text refactoring per Github comments

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/217b1049
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/217b1049
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/217b1049

Branch: refs/heads/NIFI-1073
Commit: 217b1049cf73c8ecef7dd76a954f07a7c7224cc6
Parents: 19b7a4c
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Wed Oct 28 20:53:46 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:30:25 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 151 +++++++++++++++----
 .../standard/TestAttributesToJSON.java          | 116 ++++++++++++--
 2 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/217b1049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 7098b6e..950b8d3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -8,16 +8,22 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.*;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.*;
 
 @EventDriven
 @SideEffectFree
 @SupportsBatching
-@Tags({"JSON", "attributes"})
+@Tags({"json", "attributes"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
         "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
@@ -28,16 +34,48 @@ public class AttributesToJSON extends AbstractProcessor {
 
     public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
     private static final String AT_LIST_SEPARATOR = ",";
-    private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
+
+    public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+    public static final String DESTINATION_CONTENT = "flowfile-content";
+
 
     public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
             .name("Attributes List")
-            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
+            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " +
+                    "attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
                     "in the flowfile an empty string will be output for that attritbute in the resulting JSON")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("Destination")
+            .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
+                    "or written in the flowfile content. " +
+                    "Writing to flowfile content will overwrite any existing flowfile content.")
+            .required(true)
+            .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
+            .defaultValue(DESTINATION_ATTRIBUTE)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
+            .name("Include Core Attributes")
+            .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " +
+                    "contained in every FlowFile should be included in the final JSON value generated.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
+            .name("Null Value")
+            .description("If an Attribute is value is empty or not present this property determines if an empty string" +
+                    "or true NULL value is present in the resulting JSON output")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
@@ -52,6 +90,9 @@ public class AttributesToJSON extends AbstractProcessor {
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ATTRIBUTES_LIST);
+        properties.add(DESTINATION);
+        properties.add(INCLUDE_CORE_ATTRIBUTES);
+        properties.add(NULL_VALUE_FOR_EMPTY_STRING);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -70,48 +111,100 @@ public class AttributesToJSON extends AbstractProcessor {
         return relationships;
     }
 
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
 
-        final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
-        Map<String, String> atsToWrite = null;
+    /**
+     * Builds the Map of attributes that should be included in the JSON that is emitted from this process.
+     *
+     * @return
+     *  Map<String, String> of values that are feed to a Jackson ObjectMapper
+     */
+    protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
+                                                                boolean includeCoreAttributes,
+                                                                boolean nullValForEmptyString) {
+
+        Map<String, String> atsToWrite = new HashMap<>();
 
         //If list of attributes specified get only those attributes. Otherwise write them all
-        if (atList != null && !StringUtils.isEmpty(atList)) {
-            atsToWrite = new HashMap<>();
-            String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
+        if (StringUtils.isNotBlank(atrList)) {
+            String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
             if (ats != null) {
                 for (String str : ats) {
                     String cleanStr = str.trim();
-                    String val = original.getAttribute(cleanStr);
+                    String val = ff.getAttribute(cleanStr);
                     if (val != null) {
                         atsToWrite.put(cleanStr, val);
                     } else {
-                        atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
+                        if (nullValForEmptyString) {
+                            atsToWrite.put(cleanStr, null);
+                        } else {
+                            atsToWrite.put(cleanStr, "");
+                        }
                     }
                 }
             }
         } else {
-            atsToWrite = original.getAttributes();
+            atsToWrite.putAll(ff.getAttributes());
         }
 
-        if (atsToWrite != null) {
-            if (atsToWrite.size() == 0) {
-                getLogger().debug("Flowfile contains no attributes to convert to JSON");
-            } else {
-                try {
-                    FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
-                    session.transfer(updated, REL_SUCCESS);
-                } catch (JsonProcessingException e) {
-                    getLogger().error(e.getMessage());
-                    session.transfer(original, REL_FAILURE);
-                }
-            }
+        if (!includeCoreAttributes) {
+            atsToWrite = removeCoreAttributes(atsToWrite);
+        }
+
+        return atsToWrite;
+    }
+
+    /**
+     * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile.
+     *
+     * @param atsToWrite
+     *  List of Attributes that have already been generated including the CoreAttributes
+     *
+     * @return
+     *  Difference of all attributes minus the CoreAttributes
+     */
+    protected Map<String, String> removeCoreAttributes(Map<String, String> atsToWrite) {
+        for (CoreAttributes c : CoreAttributes.values()) {
+            atsToWrite.remove(c.key());
+        }
+        return atsToWrite;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
         }
 
+        final Map<String, String> atrList = buildAttributesMapForFlowFile(original,
+                context.getProperty(ATTRIBUTES_LIST).getValue(),
+                context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
+                context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
+
+        try {
+
+            switch (context.getProperty(DESTINATION).getValue()) {
+                case DESTINATION_ATTRIBUTE:
+                    FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
+                            objectMapper.writeValueAsString(atrList));
+                    session.transfer(atFlowfile, REL_SUCCESS);
+                    break;
+                case DESTINATION_CONTENT:
+                    FlowFile conFlowfile = session.write(original, new StreamCallback() {
+                        @Override
+                        public void process(InputStream in, OutputStream out) throws IOException {
+                            try (OutputStream outputStream = new BufferedOutputStream(out)) {
+                                outputStream.write(objectMapper.writeValueAsBytes(atrList));
+                            }
+                        }
+                    });
+                    session.transfer(conFlowfile, REL_SUCCESS);
+                    break;
+            }
+
+        } catch (JsonProcessingException e) {
+            getLogger().error(e.getMessage());
+            session.transfer(original, REL_FAILURE);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/217b1049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
index a057d15..1624c4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -6,28 +6,17 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 
 public class TestAttributesToJSON {
 
-    private static Logger LOGGER;
-
-    static {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug");
-        LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
-    }
-
     private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
     private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
 
@@ -45,9 +34,84 @@ public class TestAttributesToJSON {
         testRunner.run();
     }
 
+    @Test(expected = AssertionError.class)
+    public void testInvalidIncludeCoreAttributesProperty() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "val1,val2");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "maybe");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+    }
+
+    @Test
+    public void testNullValueForEmptyAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "true");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        //Make sure that the value is a true JSON null for the non existing attribute
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        assertNull(val.get(NON_PRESENT_ATTRIBUTE_KEY));
+    }
+
+    @Test
+    public void testEmptyStringValueForEmptyAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        //Make sure that the value is a true JSON null for the non existing attribute
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        assertEquals(val.get(NON_PRESENT_ATTRIBUTE_KEY), "");
+    }
+
     @Test
     public void testInvalidJSONValueInAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -67,8 +131,9 @@ public class TestAttributesToJSON {
 
 
     @Test
-    public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception {
+    public void testAttributes_emptyListUserSpecifiedAttributes() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -93,9 +158,30 @@ public class TestAttributesToJSON {
 
 
     @Test
+    public void testContent_emptyListUserSpecifiedAttributes() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
+    }
+
+
+    @Test
     public void testAttribute_singleUserDefinedAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -123,6 +209,7 @@ public class TestAttributesToJSON {
     public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " ");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -150,6 +237,7 @@ public class TestAttributesToJSON {
     public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();