You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Giuseppe Gerla (Jira)" <ji...@apache.org> on 2023/11/23 21:34:00 UTC

[jira] [Updated] (NIFI-12407) SNMP shall include the sender IP and port in the attributes of the flow file

     [ https://issues.apache.org/jira/browse/NIFI-12407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Giuseppe Gerla updated NIFI-12407:
----------------------------------
    Affects Version/s: 1.23.2
                           (was: 1.16.3)

> SNMP shall include the sender IP and port in the attributes of the flow file
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-12407
>                 URL: https://issues.apache.org/jira/browse/NIFI-12407
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.23.2
>            Reporter: Giuseppe Gerla
>            Priority: Major
>              Labels: SNMP
>
> SNMP shall include the sender IP and port in the attributes of the flow file
> To do that we need to modify two classes; SNMPTrapReceiver and SNMPUtils.
> In SNMPUtils add a new method to include the attributes from the event (we have only included the sender ip and port)
>     public static Map{*}<{*}String{*},{*} String{*}>{*} {*}getEventAttributeMap{*}{*}({*}final CommandResponderEvent event{*}){*} *{*
>         final Map{*}<{*}String{*},{*} String{*}>{*} attributes *=* *new* HashMap{*}<>();{*}
>         attributes{*}.{*}computeIfAbsent{*}({*}SNMP_PROP_PREFIX *+* "sender"{*},{*} v *->* String{*}.{*}valueOf{*}({*}event{*}.{*}getPeerAddress{*}()));{*}
>         *return* attributes{*};{*}
>     *}*
>  
> In SNMPTrapReceiver modify the signature of the method createFlowFile to accept a CommandResponderEvent instead of a Pdu and we call the new method added to the SNMPUtils to include the new attribute with the sernder information.
>     private FlowFile createFlowFile{*}({*}final ProcessSession processSession{*},{*} *final* \{*}CommandResponderEvent event{*}{*}){*} *{*
>         *final* *PDU pdu* *=* \{*}event{*}{*}.{*}{*}getPDU{*}{*}();{*}
>         FlowFile flowFile *=* processSession{*}.{*}create{*}();{*}
>         final Map{*}<{*}String{*},{*} String{*}>{*} attributes{*};{*}
>         *if* {*}({*}pdu *instanceof* PDUv1{*}){*} *{*
>             attributes *=* SNMPUtils{*}.{*}getV1TrapPduAttributeMap{*}(({*}PDUv1{*}){*} pdu{*});{*}
>         *}* *else* *{*
>             attributes *=* SNMPUtils{*}.{*}getPduAttributeMap{*}({*}pdu{*});{*}
>         *}*
>         {*}attributes{*}{*}.{*}{*}putAll{*}{*}({*}{*}SNMPUtils{*}{*}.{*}{*}getEventAttributeMap{*}{*}({*}{*}event{*}{*}));{*}
>         flowFile *=* processSession{*}.{*}putAllAttributes{*}({*}flowFile{*},{*} attributes{*});{*}
>         *return* flowFile{*};{*}
>     *}*
>  
>     @Override
>     public void processPdu{*}({*}final CommandResponderEvent event{*}){*} *{*
>         final PDU pdu *=* event{*}.{*}getPDU{*}();{*}
>         *if* {*}({*}isValidTrapPdu{*}({*}pdu{*})){*} *{*
>             final ProcessSession processSession *=* processSessionFactory{*}.{*}createSession{*}();{*}
>             final FlowFile flowFile *=* createFlowFile{*}({*}processSession{*},{*} {*}event{*}{*});{*}
>             processSession{*}.{*}getProvenanceReporter{*}().{*}create{*}({*}flowFile{*},{*} event{*}.{*}getPeerAddress{*}(){*} *+* "/" *+* pdu{*}.{*}getRequestID{*}());{*}
>             *if* {*}({*}pdu{*}.{*}getErrorStatus{*}(){*} *==* PDU{*}.{*}noError{*}){*} *{*
>                 processSession{*}.{*}transfer{*}({*}flowFile{*},{*} REL_SUCCESS{*});{*}
>             *}* *else* *{*
>                 processSession{*}.{*}transfer{*}({*}flowFile{*},{*} REL_FAILURE{*});{*}
>             *}*
>             processSession{*}.{*}commitAsync{*}();{*}
>         *}* *else* *{*
>             logger{*}.{*}error{*}({*}"Request timed out or parameters are incorrect."{*});{*}
>         *}*
>     *}*
>  
> To test it
>  
> @Test
>     void testTrapReceiverSavesSenderIpOnFlowFile{*}(){*} *{*
>         final CommandResponderEvent mockEvent *=* mock{*}({*}CommandResponderEvent{*}.{*}class{*});{*}
>         Address address *=* UdpAddress{*}.{*}parse{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*});{*}
>         when{*}({*}mockEvent{*}.{*}getPeerAddress{*}()).{*}thenReturn{*}({*}address{*});{*}
>  
>         when{*}({*}mockPdu{*}.{*}getType{*}()).{*}thenReturn{*}({*}PDU{*}.{*}INFORM{*});{*}
>         final Vector{*}<{*}VariableBinding{*}>{*} vbs *=* *new* Vector{*}<>();{*}
>         doReturn{*}({*}vbs{*}).{*}when{*}({*}mockPdu{*}).{*}getVariableBindings{*}();{*}
>         when{*}({*}mockEvent{*}.{*}getPDU{*}()).{*}thenReturn{*}({*}mockPdu{*});{*}
>         when{*}({*}mockProcessSessionFactory{*}.{*}createSession{*}()).{*}thenReturn{*}({*}mockProcessSession{*});{*}
>  
>         snmpTrapReceiver{*}.{*}processPdu{*}({*}mockEvent{*});{*}
>  
>         final List{*}<{*}MockFlowFile{*}>{*} flowFiles *=* mockProcessSession{*}.{*}getFlowFilesForRelationship{*}({*}AmsSnmpTrapListener{*}.{*}REL_SUCCESS{*});{*}
>         final FlowFile flowFile *=* flowFiles{*}.{*}get{*}({*}0{*});{*}
>  
>         assertEquals{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*},{*} flowFile{*}.{*}getAttribute{*}({*}"snmp$sender"{*}));{*}
>     *}*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)