You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/06/11 04:35:34 UTC

nifi git commit: NIFI-5231 Added RecordStats processor.

Repository: nifi
Updated Branches:
  refs/heads/master 5a39d2a81 -> 1803c15bc


NIFI-5231 Added RecordStats processor.

NIFI-5231 Minor fixes from code review.

NIFI-5231 Added documentation and made record_count a constant.

NIFI-5231 Added changes requested in a code review.

NIFI-5231 Added changes based on code review.

This closes #2737.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1803c15b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1803c15b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1803c15b

Branch: refs/heads/master
Commit: 1803c15bcd552ca05cbb116ed0ffb8a7fe92519a
Parents: 5a39d2a
Author: Mike Thomsen <mi...@gmail.com>
Authored: Wed May 23 19:30:11 2018 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Mon Jun 11 13:35:01 2018 +0900

----------------------------------------------------------------------
 .../standard/CalculateRecordStats.java          | 237 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      |  47 ++++
 .../standard/TestCalculateRecordStats.groovy    | 151 ++++++++++++
 4 files changed, 436 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java
new file mode 100644
index 0000000..93029a9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import com.google.common.collect.Lists;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Tags({ "record", "stats", "metrics" })
+@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
+        "user-defined criteria on subsets of the record set.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+    @WritesAttribute(attribute = CalculateRecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile."),
+    @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.count", description = "A count of the records that contain a value for the user defined property."),
+    @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.<value>.count",
+            description = "Each value discovered for the user defined property will have its own count attribute. " +
+                    "Total number of top N value counts to be added is defined by the limit configuration.")
+})
+public class CalculateRecordStats extends AbstractProcessor {
+    static final String RECORD_COUNT_ATTR = "record_count";
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-stats-reader")
+        .displayName("Record Reader")
+        .description("A record reader to use for reading the records.")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+        .name("record-stats-limit")
+        .description("Limit the number of individual stats that are returned for each record path to the top N results.")
+        .required(true)
+        .defaultValue("10")
+        .addValidator(StandardValidators.INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("If a flowfile is successfully processed, it goes here.")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a flowfile fails to be processed, it goes here.")
+        .build();
+
+    private RecordPathCache cache;
+
+    static final Set RELATIONSHIPS;
+    static final List<PropertyDescriptor> PROPERTIES;
+
+    static {
+        Set _rels = new HashSet();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(RECORD_READER);
+        _temp.add(LIMIT);
+        PROPERTIES = Collections.unmodifiableList(_temp);
+    }
+
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .displayName(propertyDescriptorName)
+            .dynamic(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    }
+
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onEnabled(ProcessContext context) {
+        cache = new RecordPathCache(25);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        try {
+            Map<String, RecordPath> paths = getRecordPaths(context, input);
+            Map<String, String> stats = getStats(input, paths, context, session);
+
+            input = session.putAllAttributes(input, stats);
+
+            session.transfer(input, REL_SUCCESS);
+
+        } catch (Exception ex) {
+            getLogger().error("Error processing stats.", ex);
+            session.transfer(input, REL_FAILURE);
+        }
+
+    }
+
+    protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
+        return context.getProperties().keySet()
+            .stream().filter(p -> p.isDynamic())
+            .collect(Collectors.toMap(
+                e -> e.getName(),
+                e ->  {
+                    String val = context.getProperty(e).evaluateAttributeExpressions(flowFile).getValue();
+                    return cache.getCompiled(val);
+                })
+            );
+    }
+
+    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
+        try (InputStream is = session.read(flowFile)) {
+            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            final Integer limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
+            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
+
+            Map<String, Integer> retVal = new HashMap<>();
+            Record record;
+
+            int recordCount = 0;
+            List<String> baseKeys = new ArrayList<>();
+            while ((record = reader.nextRecord()) != null) {
+                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
+                    RecordPathResult result = entry.getValue().evaluate(record);
+                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
+                    if (value.isPresent() && value.get().getValue() != null) {
+                        String approxValue = value.get().getValue().toString();
+                        String baseKey = String.format("recordStats.%s", entry.getKey());
+                        String key = String.format("%s.%s", baseKey, approxValue);
+                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
+                        Integer baseStat = retVal.getOrDefault(baseKey, 0);
+                        stat++;
+                        baseStat++;
+
+                        retVal.put(key, stat);
+                        retVal.put(baseKey, baseStat);
+
+                        if (!baseKeys.contains(baseKey)) {
+                            baseKeys.add(baseKey);
+                        }
+                    }
+                }
+
+                recordCount++;
+            }
+
+            retVal = filterBySize(retVal, limit, baseKeys);
+
+            retVal.put(RECORD_COUNT_ATTR, recordCount);
+
+            return retVal.entrySet().stream()
+                .collect(Collectors.toMap(
+                    e -> e.getKey(),
+                    e -> e.getValue().toString()
+                ));
+        } catch (Exception e) {
+            getLogger().error("Could not read flowfile", e);
+            throw new ProcessException(e);
+        }
+    }
+
+    protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) {
+        Map<String, Integer> toFilter = values.entrySet().stream()
+            .filter(e -> !baseKeys.contains(e.getKey()))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+        Map<String, Integer> retVal = values.entrySet().stream()
+            .filter((e -> baseKeys.contains(e.getKey())))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+
+        List<Map.Entry<String, Integer>> _flat = new ArrayList<>(toFilter.entrySet());
+        _flat.sort(Map.Entry.comparingByValue());
+        _flat = Lists.reverse(_flat);
+        for (int index = 0; index < _flat.size() && index < limit; index++) {
+            retVal.put(_flat.get(index).getKey(), _flat.get(index).getValue());
+        }
+
+        return retVal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 8195963..72f0768 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,6 +15,7 @@
 org.apache.nifi.processors.standard.AttributesToJSON
 org.apache.nifi.processors.standard.AttributesToCSV
 org.apache.nifi.processors.standard.Base64EncodeContent
+org.apache.nifi.processors.standard.CalculateRecordStats
 org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate
 org.apache.nifi.processors.standard.ConvertCharacterSet

http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html
new file mode 100644
index 0000000..411d7a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html
@@ -0,0 +1,47 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>CalculateRecordStats</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor takes in a record set and counts both the overall count and counts that are defined as dynamic properties
+    that map a property name to a record path. Record path counts are provided at two levels:</p>
+    <ul>
+        <li>The overall count of all records that successfully evaluated a record path.</li>
+        <li>A breakdown of counts of unique values that matched the record path operation.</li>
+    </ul>
+    <p>Consider the following record structure:</p>
+    <pre>
+        {
+            "sport": "Soccer",
+            "name": "John Smith"
+        }
+    </pre>
+    <p>A valid mapping here would be <em>sport => /sport</em>.</p>
+    <p>For a record set with JSON like that, five entries and 3 instances of soccer and two instances of football, it would set the following
+    attributes:</p>
+    <ul>
+        <li>record_count: 5</li>
+        <li>sport: 5</li>
+        <li>sport.Soccer: 3</li>
+        <li>sport.Football: 2</li>
+    </ul>
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy
new file mode 100644
index 0000000..6e3b1fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.serialization.record.MockRecordParser
+import org.apache.nifi.serialization.record.RecordField
+import org.apache.nifi.serialization.record.RecordFieldType
+import org.apache.nifi.serialization.record.RecordSchema
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+class TestCalculateRecordStats {
+    TestRunner runner
+    MockRecordParser recordParser
+    RecordSchema personSchema
+
+    @Before
+    void setup() {
+        runner = TestRunners.newTestRunner(CalculateRecordStats.class)
+        recordParser = new MockRecordParser()
+        runner.addControllerService("recordReader", recordParser)
+        runner.setProperty(CalculateRecordStats.RECORD_READER, "recordReader")
+        runner.enableControllerService(recordParser)
+        runner.assertValid()
+
+        recordParser.addSchemaField("id", RecordFieldType.INT)
+        List<RecordField> personFields = new ArrayList<>()
+        RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType())
+        RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType())
+        RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType())
+        personFields.add(nameField)
+        personFields.add(ageField)
+        personFields.add(sportField)
+        personSchema = new SimpleRecordSchema(personFields)
+        recordParser.addSchemaField("person", RecordFieldType.RECORD)
+    }
+
+    @Test
+    void testNoNullOrEmptyRecordFields() {
+        def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football", "Basketball" ]
+        def expectedAttributes = [
+            "recordStats.sport.Soccer": "3",
+            "recordStats.sport.Football": "2",
+            "recordStats.sport.Basketball": "1",
+            "recordStats.sport": "6",
+            "record_count": "6"
+        ]
+
+        commonTest([ "sport": "/person/sport"], sports, expectedAttributes)
+    }
+
+    @Test
+    void testWithNullFields() {
+        def sports = [ "Soccer", null, null, "Football", null, "Basketball" ]
+        def expectedAttributes = [
+            "recordStats.sport.Soccer": "1",
+            "recordStats.sport.Football": "1",
+            "recordStats.sport.Basketball": "1",
+            "recordStats.sport": "3",
+            "record_count": "6"
+        ]
+
+        commonTest([ "sport": "/person/sport"], sports, expectedAttributes)
+    }
+
+    @Test
+    void testWithFilters() {
+        def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football", "Basketball" ]
+        def expectedAttributes = [
+            "recordStats.sport.Soccer": "3",
+            "recordStats.sport.Basketball": "1",
+            "recordStats.sport": "4",
+            "record_count": "6"
+        ]
+
+        def propz = [
+            "sport": "/person/sport[. != 'Football']"
+        ]
+
+        commonTest(propz, sports, expectedAttributes)
+    }
+
+    @Test
+    void testWithSizeLimit() {
+        runner.setProperty(CalculateRecordStats.LIMIT, "3")
+        def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football",
+               "Basketball", "Baseball", "Baseball", "Baseball", "Baseball",
+                "Skiing", "Skiing", "Skiing", "Snowboarding"
+        ]
+        def expectedAttributes = [
+            "recordStats.sport.Skiing": "3",
+            "recordStats.sport.Soccer": "3",
+            "recordStats.sport.Baseball": "4",
+            "recordStats.sport": String.valueOf(sports.size()),
+            "record_count": String.valueOf(sports.size())
+        ]
+
+        def propz = [
+            "sport": "/person/sport"
+        ]
+
+        commonTest(propz, sports, expectedAttributes)
+    }
+
+    private void commonTest(Map procProperties, List sports, Map expectedAttributes) {
+        int index = 1
+        sports.each { sport ->
+            recordParser.addRecord(index++, new MapRecord(personSchema, [
+                "name" : "John Doe",
+                "age"  : 48,
+                "sport": sport
+            ]))
+        }
+
+        procProperties.each { kv ->
+            runner.setProperty(kv.key, kv.value)
+        }
+
+        runner.enqueue("")
+        runner.run()
+        runner.assertTransferCount(CalculateRecordStats.REL_FAILURE, 0)
+        runner.assertTransferCount(CalculateRecordStats.REL_SUCCESS, 1)
+
+        def flowFiles = runner.getFlowFilesForRelationship(CalculateRecordStats.REL_SUCCESS)
+        def ff = flowFiles[0]
+        expectedAttributes.each { kv ->
+            Assert.assertNotNull("Missing ${kv.key}", ff.getAttribute(kv.key))
+            Assert.assertEquals(kv.value, ff.getAttribute(kv.key))
+        }
+    }
+}