You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Mark Payne (Jira)" <ji...@apache.org> on 2022/08/22 18:26:00 UTC

[jira] [Comment Edited] (NIFI-10383) When importing flow that uses InvokeScriptedProcessor, if processor references service, referenced ID is incorrect

    [ https://issues.apache.org/jira/browse/NIFI-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17583138#comment-17583138 ] 

Mark Payne edited comment on NIFI-10383 at 8/22/22 6:25 PM:
------------------------------------------------------------

To replicate this, we can use the following script body (Groovy) in InvokeScriptedProcessor (this is a modification to one of the scripts in the unit tests):
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import org.apache.commons.codec.binary.Heximport org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.InputStreamCallback
import org.apache.nifi.processor.io.OutputStreamCallback
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.stream.io.StreamUtils
import org.apache.nifi.util.security.MessageDigestUtils
import org.apache.nifi.controller.ControllerServiceclass

TestAbstractProcessor extends AbstractProcessor {    
    def REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles that were successfully processed")
            .build();

    final static String attr = "outAttr";
    final static PropertyDescriptor myCustomProp = new PropertyDescriptor.Builder()
            .name("service")
            .displayName("service")
            .description("bla bla bla")
            .expressionLanguageSupported(false)
            .identifiesControllerService(ControllerService.class)
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    @Override
    List<PropertyDescriptor> getSupportedPropertyDescriptors(){
        return [myCustomProp] as List
    }    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile requestFlowFile = session.get();
        String myCustomPropValue = context.getProperty(myCustomProp).evaluateAttributeExpressions(requestFlowFile).getValue();
        final byte[] buff = new byte[(int) requestFlowFile.getSize()];
        session.read(requestFlowFile, new InputStreamCallback() {
            @Override
            void process(InputStream inp) throws IOException {
                StreamUtils.fillBuffer(inp, buff);
            }
        });
        final String content = new String(buff);
        final String digest = generateDigest(content + myCustomPropValue);
        requestFlowFile = session.putAttribute(requestFlowFile, attr, digest);
        requestFlowFile = session.write(requestFlowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {
                out.write(digest.getBytes());
            }
        });
        session.transfer(requestFlowFile, REL_SUCCESS);
    }

    static def generateDigest(String s){
        new String(Hex.encodeHex(MessageDigestUtils.getDigest(s.bytes)));
    }
}

processor = new TestAbstractProcessor();{code}
To replicate, create a Process Group and in the PG add an InvokeScriptedProcessor with the given script body. Create a Controller Service and set the 'service' property to reference the service.

Then download the flow definition and import it again.

This will result in the InvokeScriptedProcessor being invalid because  it references a non-existent service


was (Author: markap14):
To replicate this, we can use the following script body (Groovy) in InvokeScriptedProcessor (this is a modification to one of the scripts in the unit tests):
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import org.apache.commons.codec.binary.Heximport org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.InputStreamCallback
import org.apache.nifi.processor.io.OutputStreamCallback
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.stream.io.StreamUtils
import org.apache.nifi.util.security.MessageDigestUtils
import org.apache.nifi.controller.ControllerServiceclass

TestAbstractProcessor extends AbstractProcessor {    
    def REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles that were successfully processed")
            .build();

    final static String attr = "outAttr";
    final static PropertyDescriptor myCustomProp = new PropertyDescriptor.Builder()
            .name("service")
            .displayName("service")
            .description("bla bla bla")
            .expressionLanguageSupported(false)
            .identifiesControllerService(ControllerService.class)
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    @Override
    List<PropertyDescriptor> getSupportedPropertyDescriptors(){
        return [myCustomProp] as List
    }    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }

    @Override
    void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile requestFlowFile = session.get();
        String myCustomPropValue = context.getProperty(myCustomProp).evaluateAttributeExpressions(requestFlowFile).getValue();
        final byte[] buff = new byte[(int) requestFlowFile.getSize()];
        session.read(requestFlowFile, new InputStreamCallback() {
            @Override
            void process(InputStream inp) throws IOException {
                StreamUtils.fillBuffer(inp, buff);
            }
        });
        final String content = new String(buff);
        final String digest = generateDigest(content + myCustomPropValue);
        requestFlowFile = session.putAttribute(requestFlowFile, attr, digest);
        requestFlowFile = session.write(requestFlowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {
                out.write(digest.getBytes());
            }
        });
        session.transfer(requestFlowFile, REL_SUCCESS);
    }

    static def generateDigest(String s){
        new String(Hex.encodeHex(MessageDigestUtils.getDigest(s.bytes)));
    }
}

processor = new TestAbstractProcessor();{code}

> When importing flow that uses InvokeScriptedProcessor, if processor references service, referenced ID is incorrect
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-10383
>                 URL: https://issues.apache.org/jira/browse/NIFI-10383
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework, Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> When a flow is imported, if it contains an InvokeScriptedProcessor that uses a Controller Service, the InvokeScriptedProcessor that is created references the ID of the Controller Service when the flow was exported - NOT the ID of the Controller Service that is created during flow import.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)