You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/10/30 19:37:47 UTC

[1/5] nifi git commit: NIFI-1079 Create a new Attribute from the existing FlowFile attributes by taking either all of the existing attributes or a user defined list. The existing Attributes are converted to JSON and placed in a new Attribute on the exist

Repository: nifi
Updated Branches:
  refs/heads/master 5cc2b04b9 -> 1d97876f8


NIFI-1079 Create a new Attribute from the existing FlowFile attributes by taking
either all of the existing attributes or a user defined list. The
existing Attributes are converted to JSON and placed in a new Attribute
on the existing FlowFile as Attribute “JSONAttributes”

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19b7a4cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19b7a4cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19b7a4cc

Branch: refs/heads/master
Commit: 19b7a4cc7de9da3366aa9c1a15b97c683242b4d0
Parents: 5cc2b04
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Wed Oct 28 16:20:48 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:29:29 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 117 ++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestAttributesToJSON.java          | 177 +++++++++++++++++++
 3 files changed, 295 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
new file mode 100644
index 0000000..7098b6e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.*;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"JSON", "attributes"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
+        "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
+        "Attributes in the FlowFile are placed in the resulting JSON string. If only certain Attributes are desired you may" +
+        "specify a list of FlowFile Attributes that you want in the resulting JSON string")
+@WritesAttribute(attribute = "JSONAttributes", description = "JSON string of all the pre-existing attributes in the flowfile")
+public class AttributesToJSON extends AbstractProcessor {
+
+    public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
+    private static final String AT_LIST_SEPARATOR = ",";
+    private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
+
+    public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
+            .name("Attributes List")
+            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
+                    "in the flowfile an empty string will be output for that attritbute in the resulting JSON")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("Failed to add '" + JSON_ATTRIBUTE_NAME + "' attribute to the flowfile").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(ATTRIBUTES_LIST);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
+        Map<String, String> atsToWrite = null;
+
+        //If list of attributes specified get only those attributes. Otherwise write them all
+        if (atList != null && !StringUtils.isEmpty(atList)) {
+            atsToWrite = new HashMap<>();
+            String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
+            if (ats != null) {
+                for (String str : ats) {
+                    String cleanStr = str.trim();
+                    String val = original.getAttribute(cleanStr);
+                    if (val != null) {
+                        atsToWrite.put(cleanStr, val);
+                    } else {
+                        atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
+                    }
+                }
+            }
+        } else {
+            atsToWrite = original.getAttributes();
+        }
+
+        if (atsToWrite != null) {
+            if (atsToWrite.size() == 0) {
+                getLogger().debug("Flowfile contains no attributes to convert to JSON");
+            } else {
+                try {
+                    FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
+                    session.transfer(updated, REL_SUCCESS);
+                } catch (JsonProcessingException e) {
+                    getLogger().error(e.getMessage());
+                    session.transfer(original, REL_FAILURE);
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b12fb6f..8507c96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+org.apache.nifi.processors.standard.AttributesToJSON
 org.apache.nifi.processors.standard.Base64EncodeContent
 org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate

http://git-wip-us.apache.org/repos/asf/nifi/blob/19b7a4cc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
new file mode 100644
index 0000000..a057d15
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -0,0 +1,177 @@
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestAttributesToJSON {
+
+    private static Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug");
+        LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
+    }
+
+    private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
+    private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
+
+    @Test(expected = AssertionError.class)
+    public void testInvalidUserSuppliedAttributeList() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+
+        //Attribute list CANNOT be empty
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+    }
+
+    @Test
+    public void testInvalidJSONValueInAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        //Create attribute that contains an invalid JSON Character
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, "'badjson'");
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+    }
+
+
+    @Test
+    public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+    }
+
+
+    @Test
+    public void testAttribute_singleUserDefinedAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+        assertTrue(val.size() == 1);
+    }
+
+
+    @Test
+    public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " ");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+        assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
+        assertTrue(val.size() == 1);
+    }
+
+
+    @Test
+    public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        //If a Attribute is requested but does not exist then it is placed in the JSON with an empty string
+        assertTrue(val.get("NonExistingAttribute").equals(""));
+        assertTrue(val.size() == 1);
+    }
+
+}


[3/5] nifi git commit: NIFI-1079 Check style and documentation based updates recommended by user bbende on Github pull request

Posted by bb...@apache.org.
NIFI-1079 Check style and documentation based updates recommended by user bbende on Github pull request

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aef0d8fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aef0d8fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aef0d8fe

Branch: refs/heads/master
Commit: aef0d8fe9b994ce333d14083d7f39edc77622d72
Parents: 217b104
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Fri Oct 30 12:37:11 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:30:42 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 74 ++++++++++++++------
 .../standard/TestAttributesToJSON.java          | 49 ++++++++-----
 2 files changed, 86 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aef0d8fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 950b8d3..89bb0b6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -1,15 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.nifi.processors.standard;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -18,32 +43,38 @@ import org.apache.nifi.stream.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
 
 @EventDriven
 @SideEffectFree
 @SupportsBatching
-@Tags({"json", "attributes"})
+@Tags({"json", "attributes", "flowfile"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
-        "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
-        "Attributes in the FlowFile are placed in the resulting JSON string. If only certain Attributes are desired you may" +
-        "specify a list of FlowFile Attributes that you want in the resulting JSON string")
-@WritesAttribute(attribute = "JSONAttributes", description = "JSON string of all the pre-existing attributes in the flowfile")
+@CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " +
+        "can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content.")
+@WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes")
 public class AttributesToJSON extends AbstractProcessor {
 
-    public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
+    public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes";
     private static final String AT_LIST_SEPARATOR = ",";
 
     public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
     public static final String DESTINATION_CONTENT = "flowfile-content";
+    private final String APPLICATION_JSON = "application/json";
 
 
     public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
             .name("Attributes List")
-            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " +
-                    "attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
-                    "in the flowfile an empty string will be output for that attritbute in the resulting JSON")
+            .description("Comma separated list of attributes to be included in the resulting JSON. If this value " +
+                    "is left empty then all existing Attributes will be included. This list of attributes is " +
+                    "case sensitive. If an attribute specified in the list is not found it will be be emitted " +
+                    "to the resulting JSON with an empty string or NULL value.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -51,8 +82,8 @@ public class AttributesToJSON extends AbstractProcessor {
     public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
             .name("Destination")
             .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
-                    "or written in the flowfile content. " +
-                    "Writing to flowfile content will overwrite any existing flowfile content.")
+                    "or written in the flowfile content. Writing to flowfile content will overwrite any " +
+                    "existing flowfile content.")
             .required(true)
             .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
             .defaultValue(DESTINATION_ATTRIBUTE)
@@ -68,9 +99,9 @@ public class AttributesToJSON extends AbstractProcessor {
             .build();
 
     public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
-            .name("Null Value")
-            .description("If an Attribute is value is empty or not present this property determines if an empty string" +
-                    "or true NULL value is present in the resulting JSON output")
+            .name(("If true a non existing or empty attribute will be NULL in the resulting JSON. If false an empty " +
+                    "string will be placed in the JSON"))
+            .description("")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -78,9 +109,9 @@ public class AttributesToJSON extends AbstractProcessor {
 
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
-            .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
+            .description("Successfully converted attributes to JSON").build();
     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
-            .description("Failed to add '" + JSON_ATTRIBUTE_NAME + "' attribute to the flowfile").build();
+            .description("Failed to convert attributes to JSON").build();
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
@@ -116,7 +147,7 @@ public class AttributesToJSON extends AbstractProcessor {
      * Builds the Map of attributes that should be included in the JSON that is emitted from this process.
      *
      * @return
-     *  Map<String, String> of values that are feed to a Jackson ObjectMapper
+     *  Map of values that are feed to a Jackson ObjectMapper
      */
     protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
                                                                 boolean includeCoreAttributes,
@@ -198,6 +229,7 @@ public class AttributesToJSON extends AbstractProcessor {
                             }
                         }
                     });
+                    conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
                     session.transfer(conFlowfile, REL_SUCCESS);
                     break;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aef0d8fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
index 1624c4b..0f9ec26 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.nifi.processors.standard;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -63,8 +80,8 @@ public class TestAttributesToJSON {
         testRunner.run();
 
         //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 
@@ -93,8 +110,8 @@ public class TestAttributesToJSON {
         testRunner.run();
 
         //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 
@@ -123,8 +140,8 @@ public class TestAttributesToJSON {
         testRunner.run();
 
         //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
     }
@@ -143,8 +160,8 @@ public class TestAttributesToJSON {
         testRunner.enqueue(ff);
         testRunner.run();
 
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 
@@ -169,8 +186,8 @@ public class TestAttributesToJSON {
         testRunner.enqueue(ff);
         testRunner.run();
 
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
         testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
@@ -190,8 +207,8 @@ public class TestAttributesToJSON {
         testRunner.enqueue(ff);
         testRunner.run();
 
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 
@@ -218,8 +235,8 @@ public class TestAttributesToJSON {
         testRunner.enqueue(ff);
         testRunner.run();
 
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 
@@ -246,8 +263,8 @@ public class TestAttributesToJSON {
         testRunner.enqueue(ff);
         testRunner.run();
 
-        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
-                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
+                .assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
         testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
         testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
 


[2/5] nifi git commit: NIFI-1079 Added Destination Property to control if JSON goes to Attribute or Content of FlowFile. Added Include Core Attributes Property to control if FlowFile CoreAttributes are included in the JSON output or not. Added Null value

Posted by bb...@apache.org.
NIFI-1079 Added Destination Property to control if JSON goes to Attribute or
Content of FlowFile. Added Include Core Attributes Property to control
if FlowFile CoreAttributes are included in the JSON output or not.
Added Null value for Empty String flag to control if empty values in
the JSON are empty string or true NULL values. Added more tests and
minor text refactoring per Github comments

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/217b1049
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/217b1049
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/217b1049

Branch: refs/heads/master
Commit: 217b1049cf73c8ecef7dd76a954f07a7c7224cc6
Parents: 19b7a4c
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Wed Oct 28 20:53:46 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:30:25 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/AttributesToJSON.java   | 151 +++++++++++++++----
 .../standard/TestAttributesToJSON.java          | 116 ++++++++++++--
 2 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/217b1049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 7098b6e..950b8d3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -8,16 +8,22 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.*;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.*;
 
 @EventDriven
 @SideEffectFree
 @SupportsBatching
-@Tags({"JSON", "attributes"})
+@Tags({"json", "attributes"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
         "The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
@@ -28,16 +34,48 @@ public class AttributesToJSON extends AbstractProcessor {
 
     public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
     private static final String AT_LIST_SEPARATOR = ",";
-    private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
+
+    public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+    public static final String DESTINATION_CONTENT = "flowfile-content";
+
 
     public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
             .name("Attributes List")
-            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
+            .description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' " +
+                    "attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
                     "in the flowfile an empty string will be output for that attritbute in the resulting JSON")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+            .name("Destination")
+            .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
+                    "or written in the flowfile content. " +
+                    "Writing to flowfile content will overwrite any existing flowfile content.")
+            .required(true)
+            .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
+            .defaultValue(DESTINATION_ATTRIBUTE)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
+            .name("Include Core Attributes")
+            .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " +
+                    "contained in every FlowFile should be included in the final JSON value generated.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
+            .name("Null Value")
+            .description("If an Attribute is value is empty or not present this property determines if an empty string" +
+                    "or true NULL value is present in the resulting JSON output")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
@@ -52,6 +90,9 @@ public class AttributesToJSON extends AbstractProcessor {
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ATTRIBUTES_LIST);
+        properties.add(DESTINATION);
+        properties.add(INCLUDE_CORE_ATTRIBUTES);
+        properties.add(NULL_VALUE_FOR_EMPTY_STRING);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -70,48 +111,100 @@ public class AttributesToJSON extends AbstractProcessor {
         return relationships;
     }
 
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
 
-        final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
-        Map<String, String> atsToWrite = null;
+    /**
+     * Builds the Map of attributes that should be included in the JSON that is emitted from this process.
+     *
+     * @return
+     *  Map<String, String> of values that are feed to a Jackson ObjectMapper
+     */
+    protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
+                                                                boolean includeCoreAttributes,
+                                                                boolean nullValForEmptyString) {
+
+        Map<String, String> atsToWrite = new HashMap<>();
 
         //If list of attributes specified get only those attributes. Otherwise write them all
-        if (atList != null && !StringUtils.isEmpty(atList)) {
-            atsToWrite = new HashMap<>();
-            String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
+        if (StringUtils.isNotBlank(atrList)) {
+            String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
             if (ats != null) {
                 for (String str : ats) {
                     String cleanStr = str.trim();
-                    String val = original.getAttribute(cleanStr);
+                    String val = ff.getAttribute(cleanStr);
                     if (val != null) {
                         atsToWrite.put(cleanStr, val);
                     } else {
-                        atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
+                        if (nullValForEmptyString) {
+                            atsToWrite.put(cleanStr, null);
+                        } else {
+                            atsToWrite.put(cleanStr, "");
+                        }
                     }
                 }
             }
         } else {
-            atsToWrite = original.getAttributes();
+            atsToWrite.putAll(ff.getAttributes());
         }
 
-        if (atsToWrite != null) {
-            if (atsToWrite.size() == 0) {
-                getLogger().debug("Flowfile contains no attributes to convert to JSON");
-            } else {
-                try {
-                    FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
-                    session.transfer(updated, REL_SUCCESS);
-                } catch (JsonProcessingException e) {
-                    getLogger().error(e.getMessage());
-                    session.transfer(original, REL_FAILURE);
-                }
-            }
+        if (!includeCoreAttributes) {
+            atsToWrite = removeCoreAttributes(atsToWrite);
+        }
+
+        return atsToWrite;
+    }
+
+    /**
+     * Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile.
+     *
+     * @param atsToWrite
+     *  List of Attributes that have already been generated including the CoreAttributes
+     *
+     * @return
+     *  Difference of all attributes minus the CoreAttributes
+     */
+    protected Map<String, String> removeCoreAttributes(Map<String, String> atsToWrite) {
+        for (CoreAttributes c : CoreAttributes.values()) {
+            atsToWrite.remove(c.key());
+        }
+        return atsToWrite;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
         }
 
+        final Map<String, String> atrList = buildAttributesMapForFlowFile(original,
+                context.getProperty(ATTRIBUTES_LIST).getValue(),
+                context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
+                context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
+
+        try {
+
+            switch (context.getProperty(DESTINATION).getValue()) {
+                case DESTINATION_ATTRIBUTE:
+                    FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
+                            objectMapper.writeValueAsString(atrList));
+                    session.transfer(atFlowfile, REL_SUCCESS);
+                    break;
+                case DESTINATION_CONTENT:
+                    FlowFile conFlowfile = session.write(original, new StreamCallback() {
+                        @Override
+                        public void process(InputStream in, OutputStream out) throws IOException {
+                            try (OutputStream outputStream = new BufferedOutputStream(out)) {
+                                outputStream.write(objectMapper.writeValueAsBytes(atrList));
+                            }
+                        }
+                    });
+                    session.transfer(conFlowfile, REL_SUCCESS);
+                    break;
+            }
+
+        } catch (JsonProcessingException e) {
+            getLogger().error(e.getMessage());
+            session.transfer(original, REL_FAILURE);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/217b1049/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
index a057d15..1624c4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java
@@ -6,28 +6,17 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 
 public class TestAttributesToJSON {
 
-    private static Logger LOGGER;
-
-    static {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug");
-        LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
-    }
-
     private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
     private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
 
@@ -45,9 +34,84 @@ public class TestAttributesToJSON {
         testRunner.run();
     }
 
+    @Test(expected = AssertionError.class)
+    public void testInvalidIncludeCoreAttributesProperty() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "val1,val2");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "maybe");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+    }
+
+    @Test
+    public void testNullValueForEmptyAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "true");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        //Make sure that the value is a true JSON null for the non existing attribute
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        assertNull(val.get(NON_PRESENT_ATTRIBUTE_KEY));
+    }
+
+    @Test
+    public void testEmptyStringValueForEmptyAttribute() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
+        final String NON_PRESENT_ATTRIBUTE_KEY = "NonExistingAttributeKey";
+        testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        //Expecting success transition because Jackson is taking care of escaping the bad JSON characters
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+
+        //Make sure that the value is a true JSON null for the non existing attribute
+        String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
+                .get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> val = mapper.readValue(json, HashMap.class);
+
+        assertEquals(val.get(NON_PRESENT_ATTRIBUTE_KEY), "");
+    }
+
     @Test
     public void testInvalidJSONValueInAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -67,8 +131,9 @@ public class TestAttributesToJSON {
 
 
     @Test
-    public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception {
+    public void testAttributes_emptyListUserSpecifiedAttributes() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -93,9 +158,30 @@ public class TestAttributesToJSON {
 
 
     @Test
+    public void testContent_emptyListUserSpecifiedAttributes() throws Exception {
+        final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
+        testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
+
+        ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+
+        testRunner.enqueue(ff);
+        testRunner.run();
+
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
+                assertAttributeNotExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
+        testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
+        testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
+    }
+
+
+    @Test
     public void testAttribute_singleUserDefinedAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -123,6 +209,7 @@ public class TestAttributesToJSON {
     public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " ");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();
@@ -150,6 +237,7 @@ public class TestAttributesToJSON {
     public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
         testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute");
+        testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
         FlowFile ff = session.create();


[4/5] nifi git commit: NIFI-1079 Replacing Name for NULL_VALUE_FOR_EMPTY_STRING as I had accidentally removed it.

Posted by bb...@apache.org.
NIFI-1079 Replacing Name for NULL_VALUE_FOR_EMPTY_STRING as I had accidentally removed it.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb389cf8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb389cf8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb389cf8

Branch: refs/heads/master
Commit: eb389cf84cd5f610b5ea2235a4eb78de47f6417e
Parents: aef0d8f
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Fri Oct 30 14:15:59 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:30:56 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/AttributesToJSON.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/eb389cf8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
index 89bb0b6..0ab3ca7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java
@@ -99,9 +99,9 @@ public class AttributesToJSON extends AbstractProcessor {
             .build();
 
     public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
-            .name(("If true a non existing or empty attribute will be NULL in the resulting JSON. If false an empty " +
-                    "string will be placed in the JSON"))
-            .description("")
+            .name(("Null Value"))
+            .description("If true a non existing or empty attribute will be NULL in the resulting JSON. If false an empty " +
+                    "string will be placed in the JSON")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")


[5/5] nifi git commit: NIFI-1079 Pull Request functions as described and passes contrib-check. This closes #109.

Posted by bb...@apache.org.
NIFI-1079 Pull Request functions as described and passes contrib-check. This closes #109.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1d97876f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1d97876f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1d97876f

Branch: refs/heads/master
Commit: 1d97876f82cdd37b2bab5e99ffc170f483c32037
Parents: eb389cf
Author: Bryan Bende <bb...@apache.org>
Authored: Fri Oct 30 14:35:35 2015 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 30 14:36:46 2015 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------