You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by MikeThomsen <gi...@git.apache.org> on 2018/05/24 11:54:35 UTC

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

GitHub user MikeThomsen opened a pull request:

    https://github.com/apache/nifi/pull/2737

    NIFI-5231 Added RecordStats processor.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MikeThomsen/nifi NIFI-5231

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2737
    
----
commit 3bd9faac86707b0da42a223db5416ee61ccebf10
Author: Mike Thomsen <mi...@...>
Date:   2018-05-23T23:30:11Z

    NIFI-5231 Added RecordStats processor.

----


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    @ijokarumawak Anything to add now or can we close this out?


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    That makes sense, just thinking it through, and obviously I don't understand everything as well ;)
    I guess I never thought of provenance as including perf and stats stuff, so it seems like putting it there is just doing it because that is the thing that is present to use, co-opting it so to speak.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    I'm not sure how that'd work out because you need to actually read the flowfiles and calculate the stats.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194223753
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190885264
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        FieldValue fieldValue = value.get();
    +                        String approxValue = fieldValue.getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    --- End diff --
    
    ?


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192706910
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    @MikeThomsen Thanks for the updates. I will continue more close review on this when I have time, probably tomorrow.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194217053
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.RecordStats/additionalDetails.html ---
    @@ -0,0 +1,47 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +    <meta charset="utf-8" />
    +    <title>RecordStats</title>
    +
    +    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
    +</head>
    +<body>
    +    <p>This processor takes in a record set and counts both the overall count and counts that are defined as dynamic properties
    +    that map a property name to a record path. Record path counts are provided at two levels:</p>
    +    <ul>
    +        <li>The overall count of all records that successfully evaluated a record path.</li>
    +        <li>A breakdown of counts of unique values that matched the record path operation.</li>
    +    </ul>
    +    <p>Consider the following record structure:</p>
    +    <pre>
    +        {
    +            "sport": "Soccer",
    +            "name": "John Smith"
    +        }
    +    </pre>
    +    <p>A valid mapping here would be <em>sport => /sport</em>.</p>
    +    <p>For a record set with JSON like that, five entries and 3 instances of soccer and two instances of football, it would set the following
    +    attributes:</p>
    +    <ul>
    +        <li>record_count: 5</li>
    +        <li>sport: 5</li>
    +        <li>sport.Soccer: 3</li>
    +        <li>sport.Football: 2</li>
    --- End diff --
    
    These property names should be more self-descriptive and not to overlap other property name spaces.
    I suggest following names:
    
    |current|suggestion|
    |-------|------------|
    |record_count|recordStats.count|
    |sport|recordStats.sport.count|
    |sport.Soccer|recordStats.sport.count.Soccer|
    |sport.Football|recordStats.sport.count.Football|
    
    Then we can add more stats later, such as recordStats.age.min or recordStats.age.max ... etc


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190990529
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        FieldValue fieldValue = value.get();
    +                        String approxValue = fieldValue.getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    --- End diff --
    
    "record_count"  => constant field maybe?


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194224226
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +        .name("record-stats-limit")
    +        .description("Limit the number of individual stats that are returned for each record path to the top N results.")
    +        .required(true)
    +        .defaultValue("10")
    +        .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    private RecordPathCache cache;
    +
    +    static final Set RELATIONSHIPS;
    +    static final List<PropertyDescriptor> PROPERTIES;
    +
    +    static {
    +        Set _rels = new HashSet();
    +        _rels.add(REL_SUCCESS);
    +        _rels.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
    +        List<PropertyDescriptor> _temp = new ArrayList<>();
    +        _temp.add(RECORD_READER);
    +        _temp.add(LIMIT);
    +        PROPERTIES = Collections.unmodifiableList(_temp);
    +    }
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192916019
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    --- End diff --
    
    Yes, we can add this later.
    
    I hoped it will be included when this processor is released. Changing it after would require adding new configuration property. Well, that makes sense in some cases, too. E.g. if a number field represents some category information, or act as an enum, then user would expected the number of occurrence per the number value.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192706576
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    +            retVal.put(RECORD_COUNT_ATTR, recordCount);
    +
    +            return retVal.entrySet().stream()
    +                .collect(Collectors.toMap(
    +                    e -> e.getKey(),
    +                    e -> e.getValue().toString()
    --- End diff --
    
    Ok. We can add a limit there.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190723484
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        FieldValue fieldValue = value.get();
    +                        String approxValue = fieldValue.getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    --- End diff --
    
    Magic Strings


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    prob the reader and write would have to get some context passed where they can track states or increment stats, then be configured with a 'reporting' task to send the stats from a given context to


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194217240
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +        .name("record-stats-limit")
    +        .description("Limit the number of individual stats that are returned for each record path to the top N results.")
    +        .required(true)
    +        .defaultValue("10")
    +        .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    private RecordPathCache cache;
    +
    +    static final Set RELATIONSHIPS;
    +    static final List<PropertyDescriptor> PROPERTIES;
    +
    +    static {
    +        Set _rels = new HashSet();
    +        _rels.add(REL_SUCCESS);
    +        _rels.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
    +        List<PropertyDescriptor> _temp = new ArrayList<>();
    +        _temp.add(RECORD_READER);
    +        _temp.add(LIMIT);
    +        PROPERTIES = Collections.unmodifiableList(_temp);
    +    }
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    --- End diff --
    
    I'd expect dynamic properties supports EL with FlowFile.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192615490
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    --- End diff --
    
    Trivial, `map.getOrDefault` method can make these statement simpler.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    @ijokarumawak can you review?


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190714515
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    --- End diff --
    
    input and is as var names is confusing here, can we name them closer to what they are to keep them straight?
    flowFile and inputStream?
    There is only one flowFile to track in this processor so none to get fancy ;)


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192569021
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        FieldValue fieldValue = value.get();
    +                        String approxValue = fieldValue.getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    For future work, I've submitted this to add stats for numerical values.
    https://issues.apache.org/jira/browse/NIFI-5291


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    @MikeThomsen Thanks for the updates. LGTM, +1! Merging.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194223936
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    --- End diff --
    
    Changed the names, but done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194224132
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.RecordStats/additionalDetails.html ---
    @@ -0,0 +1,47 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +    <meta charset="utf-8" />
    +    <title>RecordStats</title>
    +
    +    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
    +</head>
    +<body>
    +    <p>This processor takes in a record set and counts both the overall count and counts that are defined as dynamic properties
    +    that map a property name to a record path. Record path counts are provided at two levels:</p>
    +    <ul>
    +        <li>The overall count of all records that successfully evaluated a record path.</li>
    +        <li>A breakdown of counts of unique values that matched the record path operation.</li>
    +    </ul>
    +    <p>Consider the following record structure:</p>
    +    <pre>
    +        {
    +            "sport": "Soccer",
    +            "name": "John Smith"
    +        }
    +    </pre>
    +    <p>A valid mapping here would be <em>sport => /sport</em>.</p>
    +    <p>For a record set with JSON like that, five entries and 3 instances of soccer and two instances of football, it would set the following
    +    attributes:</p>
    +    <ul>
    +        <li>record_count: 5</li>
    +        <li>sport: 5</li>
    +        <li>sport.Soccer: 3</li>
    +        <li>sport.Football: 2</li>
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194216928
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    --- End diff --
    
    Please add `recordStats.<User Defined Property Name>.count` and `recordStats.<User Defined Property Name>.count.<value>`


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    @MikeThomsen this is cool.
    
    The only thing it makes me wonder is if this kind of data can't be automatically generated and sent to a repository, almost like a new ( or actual ) reporting task. 
    
    This seems like it lends itself to time series analysis like other things.
    
    Nifi doesn't necessarily have to provide that repo. 
    



---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190885214
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    Not sure if I like that approach. The beauty of what we did was it just puts the data into the provenance repository and there aren't that many flowfiles to track. Maybe a few hundred thousand over the entire data set if we use appropriately-sized batches from `GetMongo`. NiFi handles that like a champ and s2s prov reporting has no probably rapid-fire sending it over to our tracking instance of NiFi.


---

[GitHub] nifi issue #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2737
  
    I'm not too familiar with the deep internals of the framework either. What we've seen is that with the records API it just makes sense to leverage the provenance system because it already tracks the attributes in a clean way you can leverage for stuff like giving managers a nice little ELK dashboard for the warm fuzzies.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194216702
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    --- End diff --
    
    `org.apache.nifi.processor.Processor` file is not updated to use this new processor from NiFi flow.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190990605
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        FieldValue fieldValue = value.get();
    +                        String approxValue = fieldValue.getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    --- End diff --
    
    not a biggie


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194216843
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    --- End diff --
    
    'Record Reader' should be required.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194224162
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +        .name("record-stats-limit")
    +        .description("Limit the number of individual stats that are returned for each record path to the top N results.")
    +        .required(true)
    +        .defaultValue("10")
    +        .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    private RecordPathCache cache;
    +
    +    static final Set RELATIONSHIPS;
    +    static final List<PropertyDescriptor> PROPERTIES;
    +
    +    static {
    +        Set _rels = new HashSet();
    +        _rels.add(REL_SUCCESS);
    +        _rels.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
    +        List<PropertyDescriptor> _temp = new ArrayList<>();
    +        _temp.add(RECORD_READER);
    +        _temp.add(LIMIT);
    +        PROPERTIES = Collections.unmodifiableList(_temp);
    +    }
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192619043
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    +                        stat++;
    +                        baseStat++;
    +
    +                        retVal.put(key, stat);
    +                        retVal.put(entry.getKey(), baseStat);
    +                    }
    +                }
    +
    +                recordCount++;
    +            }
    +            retVal.put(RECORD_COUNT_ATTR, recordCount);
    +
    +            return retVal.entrySet().stream()
    +                .collect(Collectors.toMap(
    +                    e -> e.getKey(),
    +                    e -> e.getValue().toString()
    --- End diff --
    
    For the non-number values, such as the 'sport' dataset, the list can be huge, order of thousands or more. That can be too big to store as FlowFile attributes. At lease we need a fixed number of attributes for this processor can add. It would be more helpful if we can limit the N number of values (i.e. soccer, football, basketball ... etc) and report the highest N variables, to report better stats with a long-tail dataset.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190885459
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    --- End diff --
    
    I guess we can drop that.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194223770
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192706932
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    +                        String key = String.format("%s.%s", entry.getKey(), approxValue);
    +                        Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
    +                        Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0;
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190721904
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(input, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    --- End diff --
    
    Why is fieldValue needed? There are a lot of *values in this loop.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194217227
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +        .name("record-stats-limit")
    +        .description("Limit the number of individual stats that are returned for each record path to the top N results.")
    +        .required(true)
    +        .defaultValue("10")
    +        .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    private RecordPathCache cache;
    +
    +    static final Set RELATIONSHIPS;
    +    static final List<PropertyDescriptor> PROPERTIES;
    +
    +    static {
    +        Set _rels = new HashSet();
    +        _rels.add(REL_SUCCESS);
    +        _rels.add(REL_FAILURE);
    +        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
    +        List<PropertyDescriptor> _temp = new ArrayList<>();
    +        _temp.add(RECORD_READER);
    +        _temp.add(LIMIT);
    +        PROPERTIES = Collections.unmodifiableList(_temp);
    +    }
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    --- End diff --
    
    I think `!p.getName().contains(RECORD_READER.getName())` part is not necessary.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192614001
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    --- End diff --
    
    No validator is required for ControllerService.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194216678
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    --- End diff --
    
    Since NiFi uses naming convention starting with a verve, this processor should be named such as 'CalcurateRecordStats' .


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2737


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192707116
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    --- End diff --
    
    Let's do that as a separate ticket if you don't mind.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r194223780
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,225 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2737: NIFI-5231 Added RecordStats processor.

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r192618442
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@Tags({ "record", "stats", "metrics" })
    +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
    +        "user-defined criteria on subsets of the record set.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +    @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.")
    +})
    +public class RecordStats extends AbstractProcessor {
    +    static final String RECORD_COUNT_ATTR = "record_count";
    +
    +    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(flowFile)) {
    +            RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +            RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
    +
    +            Map<String, Integer> retVal = new HashMap<>();
    +            Record record;
    +
    +            int recordCount = 0;
    +            while ((record = reader.nextRecord()) != null) {
    +                for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    +                    RecordPathResult result = entry.getValue().evaluate(record);
    +                    Optional<FieldValue> value = result.getSelectedFields().findFirst();
    +                    if (value.isPresent() && value.get().getValue() != null) {
    +                        String approxValue = value.get().getValue().toString();
    --- End diff --
    
    Idea for improvement. If the RecordPath result is number, providing the counts based on value will be useful? Rather, I'd like to see in addition to counts (current `baseStat`), also min, max, sum and optionally sum of squared x.


---