You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by PrashanthVenkatesan <gi...@git.apache.org> on 2018/06/27 16:04:06 UTC

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

GitHub user PrashanthVenkatesan opened a pull request:

    https://github.com/apache/nifi/pull/2820

    NIFI-5327 Adding Netflowv5 protocol parser

    NIFI-5327 Netflowv5 protocol parser processor.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PrashanthVenkatesan/nifi NIFI-5327

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2820
    
----
commit 825f388c29b3d0905d4640b7df51ee42531b773c
Author: “PrashanthVenkatesan” <pr...@...>
Date:   2018-06-27T12:42:00Z

    NIFI-5327 Adding Netflowv5 protocol parser

----


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218555092
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    --- End diff --
    
    ok cool. let me currently use session.create() alone..


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204218878
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-nar/pom.xml ---
    @@ -1,40 +1,37 @@
     <?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    Ah eclipse did the code formatting i think.. Let me change it  back.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218401785
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    --- End diff --
    
    If we need to go there, you can do this later:
    
    ```
    FlowFile newFF = session.create(inputFlowFile);
    newFF = session.putAllAttributes(newFF, inputFlowFile.getAttributes());
    ```


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r210868364
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    --- End diff --
    
    As far as i know, netflow packets are generally exported as UDP datagrams. So, I think the buffer length should also falls under UDP datagram limit..


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards Have any bandwidth to test this PR?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501508
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser;
    +
    +import java.util.OptionalInt;
    +
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toShort;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toInt;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toLong;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toIPV4;
    +
    +/**
    + * Networkv5 is Cisco data export format which contains one header and one or more flow records. This Parser parses the netflowv5 format. More information: @see
    + * <a href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">Netflowv5</a>
    + */
    +public final class Netflowv5Parser {
    +    private static final int HEADER_SIZE = 24;
    +    private static final int RECORD_SIZE = 48;
    +
    +    private static final int SHORT_TYPE = 0;
    +    private static final int INTEGER_TYPE = 1;
    +    private static final int LONG_TYPE = 2;
    +    private static final int IPV4_TYPE = 3;
    +
    +    private static final String headerField[] = { "version", "count", "sys_uptime", "unix_secs", "unix_nsecs", "flow_sequence", "engine_type", "engine_id", "sampling_interval" };
    +    private static final String recordField[] = { "srcaddr", "dstaddr", "nexthop", "input", "output", "dPkts", "dOctets", "first", "last", "srcport", "dstport", "pad1", "tcp_flags", "prot", "tos",
    +            "src_as", "dst_as", "src_mask", "dst_mask", "pad2" };
    +
    +    private final int portNumber;
    +
    +    private Object headerData[];
    +    private Object recordData[][];
    +
    +    public Netflowv5Parser(final OptionalInt portNumber) {
    +        this.portNumber = (portNumber.isPresent()) ? portNumber.getAsInt() : 0;
    +    }
    +
    +    public final int parse(final byte[] buffer) throws Throwable {
    +        final int version = toInt(buffer, 0, 2);
    +        assert version == 5 : "Version mismatch";
    +        final int count = toInt(buffer, 2, 2);
    --- End diff --
    
    Do we need any additional validation of the `buffer` variable like checking for a minimum length?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218554835
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    --- End diff --
    
    I tried commenting that line. I am getting error in junit test case.. 


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204247590
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    Did you missed any attachments in your comments??  Are you referring something like this.. 
    [dataschema.txt](https://github.com/apache/nifi/files/2217231/dataschema.txt)
    Isn't it requires field level description??


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Also, thinking of it, the name of the Nar you have should be network processors nar, not just network nar.  When we do the controller services (record readers) they will need a nar and that is the convention.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501822
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    --- End diff --
    
    `AbstractProcessor` will do this for you.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501748
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    --- End diff --
    
    `session.exportTo` or changing this to a lambda would be cleaner.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    I think conclusion at this point,
    1. Sending the RAW binary message through "original" relationship
    2. Remove the "template" relationship and document the json schema in additionalDetails.html .. 
    I will do these changes and push the code.  


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    When this PR goes in, I'll create a jira for Netflow5RecordReader


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203410598
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.getTemplate;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.templateID;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    --- End diff --
    
    I think that you need to describe what the templates are.  You may possibly need to add an additionalDetails documentation for this parser.  Actually I think It would need it.
    
    Also, can you describe templates here, in the pr please?


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards  Thanks for your valuable review.
    I thought if user flow sends this json to some sink (say kafka), it would be good to have the raw_data with it. are you suggesting me to remove this ? ..  
    
    Providing template also for the above purpose, if the external system wants to parse the raw or understand the parsing configs, template would be necessary.  Also further ahead if we support other netflow related protocols that has concept of dynamic templates, it is worth sending templates via relationship.
    
    I understand your point adding raw to json looks like conflating, but i felt creating the another relationship would create separate content claim for each record that might degrade the throughput of processor.
    
    would like to know your view on my points..
    I will incorporate all the other review comments and soon push the commit.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501605
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    --- End diff --
    
    We should be able to dispense with these manual version numbers.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204250561
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    yes that is the type of thing I'm referring to.
    If you want to put the description, put it outside maybe.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    In Nifi, they way that the template would be done, is not through a Processor, but instead you would implement this functionality as a RecordReader with a set schema ( like the syslog schemas ).  Then you could choose to write the data with the schema included in the avro.
    
    Sending a template is not something that processors in standard nifi usually do, it will be enough to just document the schema in the additionalDetails.html documentation.
    
    When this processor lands, I think I'm going to do the RecordReader.
    
    Then again, we should get someone else's opinion.  @bbende  can you chime in here?



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218311518
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    --- End diff --
    
    Need clarification on this as well..


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218293320
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    --- End diff --
    
    Can you please elaborate this comment? What AbstractProcessor will do?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r206290668
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -29,6 +32,7 @@
         private static final int SHORT_TYPE = 0;
         private static final int INTEGER_TYPE = 1;
         private static final int LONG_TYPE = 2;
    --- End diff --
    
    I did some research, it doesn't support it in this version.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501429
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/pom.xml ---
    @@ -0,0 +1,43 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<parent>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<groupId>org.apache.nifi</groupId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +	<modelVersion>4.0.0</modelVersion>
    +	<artifactId>nifi-network-utils</artifactId>
    +	<packaging>jar</packaging>
    +	<dependencies>
    +		<dependency>
    +			<groupId>com.fasterxml.jackson.core</groupId>
    +			<artifactId>jackson-databind</artifactId>
    +			<version>2.7.8</version>
    --- End diff --
    
    Should be 2.9.5 to keep things consistent.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Committers,
    Any one available to review this PR?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501922
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    +            // Add JSON Objects
    +            generateJSONUtil(results, parser, record++);
    +
    +            recordFlowFile = session.write(recordFlowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    try (OutputStream outputStream = new BufferedOutputStream(out)) {
    +                        outputStream.write(mapper.writeValueAsBytes(results));
    +                    }
    +                }
    +            });
    +            // Adjust the FlowFile mime.type attribute
    +            recordFlowFile = session.putAttribute(recordFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    +            // Update the provenance for good measure
    +            session.getProvenanceReporter().modifyContent(recordFlowFile, "Replaced content with parsed netflowv5 fields and values");
    --- End diff --
    
    If you go with `create` you can get rid of this. Something to keep in mind here is that depending on the volume and velocity of the data, you could easily overpower the older provenance repository. The write-ahead one could probably handle it, but you might cause problems for people who didn't migrate (and I'm not sure if the write-ahead one is default now)


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501991
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    +            // Add JSON Objects
    +            generateJSONUtil(results, parser, record++);
    +
    +            recordFlowFile = session.write(recordFlowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    try (OutputStream outputStream = new BufferedOutputStream(out)) {
    +                        outputStream.write(mapper.writeValueAsBytes(results));
    +                    }
    +                }
    +            });
    +            // Adjust the FlowFile mime.type attribute
    +            recordFlowFile = session.putAttribute(recordFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    +            // Update the provenance for good measure
    +            session.getProvenanceReporter().modifyContent(recordFlowFile, "Replaced content with parsed netflowv5 fields and values");
    +            multipleRecords.add(recordFlowFile);
    +        }
    +    }
    +
    +    private void generateKV(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Map<String, String> attributes, final Netflowv5Parser parser,
    +            final int processedRecord) {
    +        int numberOfRecords = processedRecord;
    +        generateHeaderAttributes(attributes, parser);
    +
    +        final String[] fieldname = getRecordFields();
    +        int record = 0;
    +        FlowFile recordFlowFile = flowFile;
    +        while (numberOfRecords-- > 0) {
    +            // Process KVs of the Flow Record fields
    +            final Object[] fieldvalue = parser.getRecordData()[record++];
    +            for (int i = 0; i < fieldname.length; i++) {
    +                attributes.put("netflowv5.record." + fieldname[i], String.valueOf(fieldvalue[i]));
    +            }
    +            recordFlowFile = session.clone(flowFile);
    --- End diff --
    
    Switch to `session.create`


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Anything not windows specific?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r212195228
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    --- End diff --
    
    @MikeThomsen  Can you share your view on this?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204215111
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    Are you using "Template"  in the json you output?
    I thought you removed that.  The schema should be what is in the record flow file exactly


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r206164076
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -29,6 +32,7 @@
         private static final int SHORT_TYPE = 0;
         private static final int INTEGER_TYPE = 1;
         private static final int LONG_TYPE = 2;
    --- End diff --
    
    So netflow5 doesn't support IPV6? If not then cool


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @bbende when he says Template he means a json record with schema information


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @joewitt @bbende AFAIK, I have no resources for testing this against a live Cisco system. What are your thoughts on merging it if everything checks out but I can't do a live test?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218566840
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    --- End diff --
    
    No, you should be able to safely remove the version number.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Build with tests and contrib-check was fine as well


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204246781
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    no it doesn't.. Output json looks like this.
    [data.txt](https://github.com/apache/nifi/files/2217192/data.txt)
    So, you want to me change the docs with actual JSON schema(AVRO version)?? I thought of mentioning just the structure as given by RFC in json format hence i used word "template".



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r205314387
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    I am sorry @PrashanthVenkatesan, I've been on vacation the last couple of days.  I'm fine with the review, I just want to run everything again.  I will try as soon as I can.
    
    I am not a committer though, so even with my potential +1 you will still need a committer to sign off and merge.



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204390557
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    I think something like what is in dataschema.txt is fine. If we think people are going to use the record processors on the output, then it wouldn't hurt to also have the Avro schema, but not totally necessary. 


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203410039
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.getTemplate;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.templateID;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and adds attributes to the FlowFile for headers.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from ListenUDP") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields."),
    +        @WritesAttribute(attribute = "templateID", description = "template ID"), @WritesAttribute(attribute = "templateDefinition", description = "Template Definition - one time") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    // To send the template during first run
    +    private transient AtomicBoolean isFirstRun = new AtomicBoolean(true);
    +    private boolean append_raw_to_json = false;
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + "attribute, fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    --- End diff --
    
    Your boolean Properties should have AllowableValues of "true" and "false" so users get the drop down


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204266885
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    @bbende do you have an opinion on how to document the json?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r205312374
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    any other changes required?? 


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @MikeThomsen One of the build failed due to some other reason. How can i retrigger the build without making new commit?


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards  @MikeThomsen Thanks for all your support and guidance in my first OS contribution.  Soon, I will try to contribute for NetFlowv9 processor.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @PrashanthVenkatesan if any of the builds passed, don't worry about it. It's the same test, just localized to different locale settings.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218401302
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    --- End diff --
    
    I believe those are listed under the `<dependencyManagement>` block and thus the version number isn't needed. Not necessary to remove them, but I think you can remove it without breaking anything.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218553325
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser;
    +
    +import java.util.OptionalInt;
    +
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toShort;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toInt;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toLong;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toIPV4;
    +
    +/**
    + * Networkv5 is Cisco data export format which contains one header and one or more flow records. This Parser parses the netflowv5 format. More information: @see
    + * <a href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">Netflowv5</a>
    + */
    +public final class Netflowv5Parser {
    +    private static final int HEADER_SIZE = 24;
    +    private static final int RECORD_SIZE = 48;
    +
    +    private static final int SHORT_TYPE = 0;
    +    private static final int INTEGER_TYPE = 1;
    +    private static final int LONG_TYPE = 2;
    +    private static final int IPV4_TYPE = 3;
    +
    +    private static final String headerField[] = { "version", "count", "sys_uptime", "unix_secs", "unix_nsecs", "flow_sequence", "engine_type", "engine_id", "sampling_interval" };
    +    private static final String recordField[] = { "srcaddr", "dstaddr", "nexthop", "input", "output", "dPkts", "dOctets", "first", "last", "srcport", "dstport", "pad1", "tcp_flags", "prot", "tos",
    +            "src_as", "dst_as", "src_mask", "dst_mask", "pad2" };
    +
    +    private final int portNumber;
    +
    +    private Object headerData[];
    +    private Object recordData[][];
    +
    +    public Netflowv5Parser(final OptionalInt portNumber) {
    +        this.portNumber = (portNumber.isPresent()) ? portNumber.getAsInt() : 0;
    +    }
    +
    +    public final int parse(final byte[] buffer) throws Throwable {
    +        final int version = toInt(buffer, 0, 2);
    +        assert version == 5 : "Version mismatch";
    --- End diff --
    
    I didn't use any nifi related dependency in this module. Hence i will throw Exception instead ProcessException. Fine??


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    I haven't looked at any of the code so keep that in mind :)
    
    If we are talking about NiFi templates, then they are just examples of how to use a processor or set of processors, which you can then give to someone to help them get started. We have a wiki page where people have posted some for the community:
    
    https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
    
    I think the recent work @ottobackwards did with the syslog record readers is a good example of what we can do for this case. Meaning later on we can implement a NetflowV5RecordReader, which then lets you use stuff like ConvertRecord to go from netflow to any format like JSON, CSV, Avro.
    
    For now, if this processor always produces JSON, then documenting the expected output format in additionalDetails.html seems sufficient to me.
    
    I don't really know enough about what people do with netflow data to know if keeping the raw message with the parsed one makes sense. I would say typically we wouldn't keep the raw message with the parsed one, but you could always have an option to control that if you thought it was necessary. The processor can also have an original relationship as Otto suggested, although at that point the original and parsed data are completely separate.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218554585
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    --- End diff --
    
    So you mean , I can safely remove all the nifi related dependencies where version is mentioned?


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards  Thanks for your support. 
    @MikeThomsen @bbende @mattyb149  - Can anyone review this if you bandwidth?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r206144210
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -29,6 +32,7 @@
         private static final int SHORT_TYPE = 0;
         private static final int INTEGER_TYPE = 1;
         private static final int LONG_TYPE = 2;
    --- End diff --
    
    to my knowledge, there is no IPV6 format in the parsed output.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203414488
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.getTemplate;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.templateID;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and adds attributes to the FlowFile for headers.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from ListenUDP") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields."),
    +        @WritesAttribute(attribute = "templateID", description = "template ID"), @WritesAttribute(attribute = "templateDefinition", description = "Template Definition - one time") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    // To send the template during first run
    +    private transient AtomicBoolean isFirstRun = new AtomicBoolean(true);
    +    private boolean append_raw_to_json = false;
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + "attribute, fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final PropertyDescriptor APPEND_RAW_MESSAGE_TO_JSON = new PropertyDescriptor.Builder().name("APPEND_RAW_MESSAGE_TO_JSON").displayName("Append raw message to JSON")
    +            .description("When using flowfile-content (i.e. JSON output), add the original netflowv5 message to " + "the resulting JSON object. The original message is added as a string to _raw.")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR).required(true).defaultValue("true").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    static final Relationship REL_ONTEMPLATE = new Relationship.Builder().name("template_received")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 template will be transferred to this Relationship.").build();
    +    static final Relationship REL_ONDATA = new Relationship.Builder().name("data_received")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<Relationship>();
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_ONTEMPLATE);
    +        relationships.add(REL_ONDATA);
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
    +        descriptors.add(FIELDS_DESTINATION);
    +        descriptors.add(APPEND_RAW_MESSAGE_TO_JSON);
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        append_raw_to_json = context.getProperty(APPEND_RAW_MESSAGE_TO_JSON).asBoolean();
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            // Transfer one-time-template
    +            if (isFirstRun.getAndSet(false)) {
    +                FlowFile templateFlowFile = session.clone(flowFile);
    +                // Get Template Snapshot
    +                final ObjectNode results = getTemplate();
    +                templateFlowFile = session.write(templateFlowFile, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(OutputStream out) throws IOException {
    +                        try (OutputStream outputStream = new BufferedOutputStream(out)) {
    +                            outputStream.write(mapper.writeValueAsBytes(results));
    +                        }
    +                    }
    +                });
    +
    +                templateFlowFile = session.putAttribute(templateFlowFile, "templateDefinition", mapper.writeValueAsString(results));
    +                // Adjust the FlowFile mime.type attribute
    +                templateFlowFile = session.putAttribute(templateFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    +                session.transfer(templateFlowFile, REL_ONTEMPLATE);
    +            }
    +
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Ready to transfer to success and commit
    --- End diff --
    
    Shouldn't there be a ProvenanceReport?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218569285
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser;
    +
    +import java.util.OptionalInt;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toShort;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toInt;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toLong;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toIPV4;
    +
    +/**
    + * Networkv5 is Cisco data export format which contains one header and one or more flow records. This Parser parses the netflowv5 format. More information: @see
    + * <a href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">Netflowv5</a>
    + */
    +public final class Netflowv5Parser {
    +    private static final int HEADER_SIZE = 24;
    +    private static final int RECORD_SIZE = 48;
    +
    +    private static final int SHORT_TYPE = 0;
    +    private static final int INTEGER_TYPE = 1;
    +    private static final int LONG_TYPE = 2;
    +    private static final int IPV4_TYPE = 3;
    +
    +    private static final String headerField[] = { "version", "count", "sys_uptime", "unix_secs", "unix_nsecs", "flow_sequence", "engine_type", "engine_id", "sampling_interval" };
    +    private static final String recordField[] = { "srcaddr", "dstaddr", "nexthop", "input", "output", "dPkts", "dOctets", "first", "last", "srcport", "dstport", "pad1", "tcp_flags", "prot", "tos",
    +            "src_as", "dst_as", "src_mask", "dst_mask", "pad2" };
    +
    +    private final int portNumber;
    +
    +    private Object headerData[];
    +    private Object recordData[][];
    +
    +    public Netflowv5Parser(final OptionalInt portNumber) {
    +        this.portNumber = (portNumber.isPresent()) ? portNumber.getAsInt() : 0;
    +    }
    +
    +    public final int parse(final byte[] buffer) throws Throwable {
    +        if( !isValid(buffer.length) )
    --- End diff --
    
    Curly brackets are missing here. and below with the version check.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Is that template _ever_ going to change?  


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @PrashanthVenkatesan you also have a merge conflict. That'll need to be resolved as well.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214502000
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    +            // Add JSON Objects
    +            generateJSONUtil(results, parser, record++);
    +
    +            recordFlowFile = session.write(recordFlowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    try (OutputStream outputStream = new BufferedOutputStream(out)) {
    +                        outputStream.write(mapper.writeValueAsBytes(results));
    +                    }
    +                }
    +            });
    +            // Adjust the FlowFile mime.type attribute
    +            recordFlowFile = session.putAttribute(recordFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    +            // Update the provenance for good measure
    +            session.getProvenanceReporter().modifyContent(recordFlowFile, "Replaced content with parsed netflowv5 fields and values");
    +            multipleRecords.add(recordFlowFile);
    +        }
    +    }
    +
    +    private void generateKV(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Map<String, String> attributes, final Netflowv5Parser parser,
    +            final int processedRecord) {
    +        int numberOfRecords = processedRecord;
    +        generateHeaderAttributes(attributes, parser);
    +
    +        final String[] fieldname = getRecordFields();
    +        int record = 0;
    +        FlowFile recordFlowFile = flowFile;
    +        while (numberOfRecords-- > 0) {
    +            // Process KVs of the Flow Record fields
    +            final Object[] fieldvalue = parser.getRecordData()[record++];
    +            for (int i = 0; i < fieldname.length; i++) {
    +                attributes.put("netflowv5.record." + fieldname[i], String.valueOf(fieldvalue[i]));
    +            }
    +            recordFlowFile = session.clone(flowFile);
    +            recordFlowFile = session.putAllAttributes(recordFlowFile, attributes);
    +            multipleRecords.add(recordFlowFile);
    +        }
    +    }
    +
    +    private OptionalInt resolvePort(final FlowFile flowFile) {
    +        final String port;
    +        if ((port = flowFile.getAttribute("udp.port")) != null)
    --- End diff --
    
    Should have curly brackets.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501717
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    --- End diff --
    
    According to Wikipedia, it's a maximum of 64kb, so I'm good with that.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    You're right. I will format those fields**(srcaddr, dstaddr & nexthop)** as IP address..


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    You can trigger a build by closing and reopening the pr. 


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218567833
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -178,7 +178,7 @@ private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSes
                 results.set("port", mapper.valueToTree(parser.getPortNumber()));
                 results.set("format", mapper.valueToTree("netflowv5"));
     
    -            recordFlowFile = session.clone(flowFile);
    +            recordFlowFile = session.create(flowFile);
    --- End diff --
    
    Not related to this PR, but for future reference if you choose `INPUT_ALLOWED` you'll need to do something like this: `flowFile != null ? session.create(flowFile) : session.create()` because `ProcessSession`'s impl doesn't handle a null input.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204246930
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    So since this isn't a record processor/reader  I wouldn't put the avro schema there, we'll put the avro schema in that one.
    
    I would put the exact json that you output, with values being the 'types' from your schema.
    And above it, in the description just say that that is what you are doing.
    
    Maybe follow that with an example:
    
    " Here is the structure of the output net flow json, with the types:"
    the json
    
    " for example:"
    the json with data
    



---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Currently I can find only Windows applications. I'll check for any generator available that's runs in Linux. If not I will create a generator code and share it asap..


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @PrashanthVenkatesan any updates?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501765
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    --- End diff --
    
    Wrap this with a `isDebugEnabled()` check so we don't generate a bunch of strings.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2820


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204240871
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    The schema should match what is sent in the relationship.  Does the data that gets sent out ( the json you generate ) have the word "template" in it?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218292092
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    --- End diff --
    
    I can use session.create(). But do you think any need will arise in future from the user to good to have the attributes from the original flowfile?  If so, we will stick with session.clone() only. whats your view on this?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501899
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    --- End diff --
    
    I think `session.create(FlowFile)` is the right one to use here because it will establish a parent-child relationship between the newly created record and the input flowfile. I think `clone` also clones the content so that could bog NiFi down a bit as well.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards  So you mean to create "original" relationship and emit the binary RAW data to that relationship??  Fine I can do that change. 
    
    Reg template, **netflowv5** template doesn't change. But going forward if we write processor for other related protocol like netflowv9, ipfix,etc.. These protocols have dynamic template(template will change at runtime, we need to parse that from the incoming packet)...


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r210782792
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    --- End diff --
    
    This can be very dangerous. How big are the flowfiles going to be on average?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r206125096
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -29,6 +32,7 @@
         private static final int SHORT_TYPE = 0;
         private static final int INTEGER_TYPE = 1;
         private static final int LONG_TYPE = 2;
    --- End diff --
    
    I hate to ask, but is there no IPV 6 that could be in there?


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @PrashanthVenkatesan thanks for putting all the work in on this, I'm not going to be able to test this for a couple of day most likely, but I will


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204214786
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-nar/pom.xml ---
    @@ -1,40 +1,37 @@
     <?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    Not sure about how this formatting changed, but it should be the way it was


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    I will try.  Can you fill out the checkboxes in the PR template above?


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    [Generator.zip](https://github.com/apache/nifi/files/2177116/Generator.zip)
    @ottobackwards  Attached data generator jar. Although this is rough code, hope it serves the purpose. Please check the readme for usage.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @ottobackwards  You're right. Adding RAW to json is configurable property.  I also see no usage of the RAW data downstream.  I will remove that property.. Fine??
    
    However templates is required, one scenario is "assume user want to convert this json to avro in downstream(external to nifi). That time he needs to know the **data type** of each field to create avro schema.. Sending ONE-TIME template , would  be helpful in downstream to create schema."


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204404608
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    I'll put the avro in the record reader's additionalDetails.  I think it is confusing to have it with the processor.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203415821
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/parser/util/ConversionUtil.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser.util;
    +
    +import java.math.BigInteger;
    +import java.util.Arrays;
    +
    +public final class ConversionUtil {
    +    public static final BigInteger to_bigint(final byte[] buffer, final int offset, final int length) {
    --- End diff --
    
    I don't think to_bigint is the write naming convention for java.
    to_byte either.
    
    toBigInteger and toByte  


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    I think have an original relationship for flows that want to pass the data on as-is is a better option myself.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Sorry for the delay @MikeThomsen . I will make changes and push by EOD. 


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r210782553
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/pom.xml ---
    @@ -0,0 +1,67 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.nifi</groupId>
    +		<artifactId>nifi-network-bundle</artifactId>
    +		<version>1.8.0-SNAPSHOT</version>
    +	</parent>
    +
    +	<artifactId>nifi-network-processors</artifactId>
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-api</artifactId>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.nifi</groupId>
    +			<artifactId>nifi-network-utils</artifactId>
    +			<version>1.8.0-SNAPSHOT</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>com.fasterxml.jackson.core</groupId>
    +			<artifactId>jackson-databind</artifactId>
    +			<version>2.7.8</version>
    --- End diff --
    
    I think the rest of NiFi is on 2.9.X FWIW


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    @PrashanthVenkatesan It looked to me that the raw data was NOT part of the json.  Am I mistaken?  I don't think that is good practice, having json + something.
    If you put the raw content in a json field ( maybe encoded base64 or something ) that would work I think.
    
    So IF I configured output to content and IF I select include raw in output THEN encode the raw and put it in the json.
    
    Something like that.
    
    I don't understand how the output of the Netflow could be different where you would need templates.  When would there be different json?
    
    
    



---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    As far as I can see, this PR is in great same.  Good works @PrashanthVenkatesan.
    
    Tests and contrib tests are great.  I manually tested with the sample generator and the output looks like it will be useful.  Also, the factoring of the submittal will make the record reader follow up a breeze ;)
    
    +1 from me.
    
    @MikeThomsen, @bbende, @mattyb149  ?   anyone have time to take a look at this?
    
    



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r204218983
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    I removed "Template" relationship.. now, ONLY parsed data and "original" i output..  I updated this docs with "Template" json schema for user reference as per your review comments.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    OK.  Contrib check and build passes.
    Everything runs the way it should.  My question is on the data.
    
    The src and dest addresses are just the numbers.  They are supposed to be ip addresses.
    I think that formatting them as IP addresses rather than just as numbers would be more usable.
    
    Is there something I'm missing?  I don't have an example of something else that shows net flow data.
    
    Are there any other fields that are output 'raw' that could be formatted?
    
    



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r218401389
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    --- End diff --
    
    Automatically commit the session for you.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501416
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/docs/org.apache.nifi.processors.network.ParseNetflowv5/additionalDetails.html ---
    @@ -0,0 +1,74 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +<head>
    +<meta charset="utf-8" />
    +<title>Netflowv5Parser</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<p>
    +		Netflowv5Parser processor parses the ingress netflowv5 datagram format
    +		and transfers it either as flowfile attributes or JSON object.
    +		Netflowv5 format has predefined schema named "template" for parsing
    +		the netflowv5 record. More information:&nbsp;<a title="RFC-netflowv5"
    +			href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">RFC-netflowv5</a>
    +	</p>
    --- End diff --
    
    My $0.02 is that we keep this as-is and have a separate, record-based version of this. So I'm fine with this.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203414910
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -0,0 +1,123 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser;
    +
    +import java.util.OptionalInt;
    +
    +import org.apache.nifi.processors.network.parser.util.ConversionUtil;
    +
    --- End diff --
    
    Can you add a link to the standard and version of that standard that this parses and get the names from?


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501495
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-utils/src/main/java/org/apache/nifi/processors/network/parser/Netflowv5Parser.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network.parser;
    +
    +import java.util.OptionalInt;
    +
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toShort;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toInt;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toLong;
    +import static org.apache.nifi.processors.network.parser.util.ConversionUtil.toIPV4;
    +
    +/**
    + * Networkv5 is Cisco data export format which contains one header and one or more flow records. This Parser parses the netflowv5 format. More information: @see
    + * <a href="https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html">Netflowv5</a>
    + */
    +public final class Netflowv5Parser {
    +    private static final int HEADER_SIZE = 24;
    +    private static final int RECORD_SIZE = 48;
    +
    +    private static final int SHORT_TYPE = 0;
    +    private static final int INTEGER_TYPE = 1;
    +    private static final int LONG_TYPE = 2;
    +    private static final int IPV4_TYPE = 3;
    +
    +    private static final String headerField[] = { "version", "count", "sys_uptime", "unix_secs", "unix_nsecs", "flow_sequence", "engine_type", "engine_id", "sampling_interval" };
    +    private static final String recordField[] = { "srcaddr", "dstaddr", "nexthop", "input", "output", "dPkts", "dOctets", "first", "last", "srcport", "dstport", "pad1", "tcp_flags", "prot", "tos",
    +            "src_as", "dst_as", "src_mask", "dst_mask", "pad2" };
    +
    +    private final int portNumber;
    +
    +    private Object headerData[];
    +    private Object recordData[][];
    +
    +    public Netflowv5Parser(final OptionalInt portNumber) {
    +        this.portNumber = (portNumber.isPresent()) ? portNumber.getAsInt() : 0;
    +    }
    +
    +    public final int parse(final byte[] buffer) throws Throwable {
    +        final int version = toInt(buffer, 0, 2);
    +        assert version == 5 : "Version mismatch";
    --- End diff --
    
    Since assertions are disabled by default, throw `ProcessException` instead.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501669
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    --- End diff --
    
    `AllowableValue` objects are preferred for `allowableValues` because they're more human-friendly.


---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501780
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    --- End diff --
    
    Style nit: needs indentation.


---

[GitHub] nifi issue #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by PrashanthVenkatesan <gi...@git.apache.org>.
Github user PrashanthVenkatesan commented on the issue:

    https://github.com/apache/nifi/pull/2820
  
    Flow Generator: https://flowalyzer-netflow-generator.soft112.com/
    Flow Template: [NetFlowv5_Test_Template.zip](https://github.com/apache/nifi/files/2169500/NetFlowv5_Test_Template.zip)
    [This template receives data over UDP and use this custom processor to parse it]
    https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html - For more details ,Refer NetflowV5  section in this article.
    
    Hope these will be useful.



---

[GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r203411993
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.getTemplate;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Template.templateID;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and adds attributes to the FlowFile for headers.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from ListenUDP") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields."),
    +        @WritesAttribute(attribute = "templateID", description = "template ID"), @WritesAttribute(attribute = "templateDefinition", description = "Template Definition - one time") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    // To send the template during first run
    +    private transient AtomicBoolean isFirstRun = new AtomicBoolean(true);
    +    private boolean append_raw_to_json = false;
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination")
    +            .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + "attribute, fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final PropertyDescriptor APPEND_RAW_MESSAGE_TO_JSON = new PropertyDescriptor.Builder().name("APPEND_RAW_MESSAGE_TO_JSON").displayName("Append raw message to JSON")
    +            .description("When using flowfile-content (i.e. JSON output), add the original netflowv5 message to " + "the resulting JSON object. The original message is added as a string to _raw.")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR).required(true).defaultValue("true").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build();
    +    static final Relationship REL_ONTEMPLATE = new Relationship.Builder().name("template_received")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 template will be transferred to this Relationship.").build();
    +    static final Relationship REL_ONDATA = new Relationship.Builder().name("data_received")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build();
    +
    --- End diff --
    
    relationships and property descriptors sets can be created statically instead of every call.
    Check out InvokeHttp


---