You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Elemir Stevko <El...@versent.com.au> on 2019/03/21 07:05:33 UTC

Problem with property descriptor in InvokeScriptedProcessor

Hello,

I would like to implement a stateless S3 lister as a python script using InvokeScriptedProcessor in NiFi 1.9.0. I have based my script on the CompressFlowFile processor:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py

I want to declare my property 'Bucket' to be required and non-empty, so I have changed the getPropertyDescriptors method to:

def getPropertyDescriptors(self) :
    descriptor = PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()

I am getting an error when storing the script code in the processor:

Unable to get property descriptors from Processor: java.lang.reflect.UndeclaredThrowableException.

If I define the descriptor using allowableValues, it works fine:

descriptor = PropertyDescriptor.Builder().name("Bucket").allowableValues("compress", "decompress").required(True).build()

Could you please let me know what is causing this problem and how I can fix it?

Best regards,
Elemir

Here is the complete test code of the script:

import sys
import traceback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self, text):
        self.text = text
        pass

  def process(self, inputStream, outputStream):
    outputStream.write(bytearray(self.text.encode('utf-8')))

class ListS3Files(Processor) :
    __rel_success = Relationship.Builder().description("Success").name("success").build()

    def __init__(self) :
        pass

    def initialize(self, context) :
        pass

    def getRelationships(self) :
        return set([self.__rel_success])

    def validate(self, context) :
        pass

    def getPropertyDescriptors(self) :
        descriptor = PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
        #descriptor = PropertyDescriptor.Builder().name("Bucket").allowableValues("val1", "val2").required(True).build()
        return [descriptor]

    def onPropertyModified(self, descriptor, newValue, oldValue) :
        pass

    def onTrigger(self, context, sessionFactory) :
        session = sessionFactory.createSession()
        try :
            bucket = context.getProperty("Bucket").getValue()
            if not bucket:
                return

            for file in ['file1', 'file2']:
                flowfile = session.create()
                if flowfile is None :
                    return

                flowfile = session.write(flowfile, PyStreamCallback(bucket+'/'+file))
                # transfer
                session.transfer(flowfile, self.__rel_success)

            session.commit()
        except :
            print sys.exc_info()[0]
            print "Exception in TestReader:"
            print '-' * 60
            traceback.print_exc(file=sys.stdout)
            print '-' * 60

            session.rollback(true)
            raise

processor = ListS3Files()

Re: Problem with property descriptor in InvokeScriptedProcessor

Posted by Elemir Stevko <El...@versent.com.au>.
Hi Denes,

Yes, that was it. I missed the import you mentioned. Thanks a lot for your help and also for suggesting setting the logging level to debug!

Best regards,
Elemir

From: Denes Arvay <de...@apache.org>
Reply-To: "users@nifi.apache.org" <us...@nifi.apache.org>
Date: Thursday, 21 March 2019 at 6:22 pm
To: "users@nifi.apache.org" <us...@nifi.apache.org>
Subject: Re: Problem with property descriptor in InvokeScriptedProcessor

Hi Elemir,

Setting the log level to DEBUG outputs more details, it seems the cause of the exception is the following:
javax.script.ScriptException: NameError: global name 'StandardValidators' is not defined in <script> at line number 32

This means there is a missing import in your python script, adding "from org.apache.nifi.processor.util import StandardValidators" to the import list fixes the issue.

Best,
Denes


On Thu, Mar 21, 2019 at 8:05 AM Elemir Stevko <El...@versent.com.au>> wrote:
Hello,

I would like to implement a stateless S3 lister as a python script using InvokeScriptedProcessor in NiFi 1.9.0. I have based my script on the CompressFlowFile processor:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py

I want to declare my property 'Bucket' to be required and non-empty, so I have changed the getPropertyDescriptors method to:

def getPropertyDescriptors(self) :
    descriptor = PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()

I am getting an error when storing the script code in the processor:

Unable to get property descriptors from Processor: java.lang.reflect.UndeclaredThrowableException.

If I define the descriptor using allowableValues, it works fine:

descriptor = PropertyDescriptor.Builder().name("Bucket").allowableValues("compress", "decompress").required(True).build()

Could you please let me know what is causing this problem and how I can fix it?

Best regards,
Elemir

Here is the complete test code of the script:

import sys
import traceback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.io<http://org.apache.nifi.processor.io> import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self, text):
        self.text = text
        pass

  def process(self, inputStream, outputStream):
    outputStream.write(bytearray(self.text.encode('utf-8')))

class ListS3Files(Processor) :
    __rel_success = Relationship.Builder().description("Success").name("success").build()

    def __init__(self) :
        pass

    def initialize(self, context) :
        pass

    def getRelationships(self) :
        return set([self.__rel_success])

    def validate(self, context) :
        pass

    def getPropertyDescriptors(self) :
        descriptor = PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
        #descriptor = PropertyDescriptor.Builder().name("Bucket").allowableValues("val1", "val2").required(True).build()
        return [descriptor]

    def onPropertyModified(self, descriptor, newValue, oldValue) :
        pass

    def onTrigger(self, context, sessionFactory) :
        session = sessionFactory.createSession()
        try :
            bucket = context.getProperty("Bucket").getValue()
            if not bucket:
                return

            for file in ['file1', 'file2']:
                flowfile = session.create()
                if flowfile is None :
                    return

                flowfile = session.write(flowfile, PyStreamCallback(bucket+'/'+file))
                # transfer
                session.transfer(flowfile, self.__rel_success)

            session.commit()
        except :
            print sys.exc_info()[0]
            print "Exception in TestReader:"
            print '-' * 60
            traceback.print_exc(file=sys.stdout)
            print '-' * 60

            session.rollback(true)
            raise

processor = ListS3Files()

Re: Problem with property descriptor in InvokeScriptedProcessor

Posted by Denes Arvay <de...@apache.org>.
Hi Elemir,

Setting the log level to DEBUG outputs more details, it seems the cause of
the exception is the following:
javax.script.ScriptException: NameError: global name 'StandardValidators'
is not defined in <script> at line number 32

This means there is a missing import in your python script, adding "from
org.apache.nifi.processor.util import StandardValidators" to the import
list fixes the issue.

Best,
Denes


On Thu, Mar 21, 2019 at 8:05 AM Elemir Stevko <El...@versent.com.au>
wrote:

> Hello,
>
> I would like to implement a stateless S3 lister as a python script using
> InvokeScriptedProcessor in NiFi 1.9.0. I have based my script on the
> CompressFlowFile processor:
>
>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py
>
> I want to declare my property 'Bucket' to be required and non-empty, so I
> have changed the getPropertyDescriptors method to:
>
> def getPropertyDescriptors(self) :
>     descriptor =
> PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
>
> I am getting an error when storing the script code in the processor:
>
> Unable to get property descriptors from Processor:
> java.lang.reflect.UndeclaredThrowableException.
>
> If I define the descriptor using allowableValues, it works fine:
>
> descriptor =
> PropertyDescriptor.Builder().name("Bucket").allowableValues("compress",
> "decompress").required(True).build()
>
> Could you please let me know what is causing this problem and how I can
> fix it?
>
> Best regards,
> Elemir
>
> Here is the complete test code of the script:
>
> import sys
> import traceback
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.apache.nifi.processor.io import StreamCallback
>
> class PyStreamCallback(StreamCallback):
>   def __init__(self, text):
>         self.text = text
>         pass
>
>   def process(self, inputStream, outputStream):
>     outputStream.write(bytearray(self.text.encode('utf-8')))
>
> class ListS3Files(Processor) :
>     __rel_success =
> Relationship.Builder().description("Success").name("success").build()
>
>     def __init__(self) :
>         pass
>
>     def initialize(self, context) :
>         pass
>
>     def getRelationships(self) :
>         return set([self.__rel_success])
>
>     def validate(self, context) :
>         pass
>
>     def getPropertyDescriptors(self) :
>         descriptor =
> PropertyDescriptor.Builder().name("Bucket").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
>         #descriptor =
> PropertyDescriptor.Builder().name("Bucket").allowableValues("val1",
> "val2").required(True).build()
>         return [descriptor]
>
>     def onPropertyModified(self, descriptor, newValue, oldValue) :
>         pass
>
>     def onTrigger(self, context, sessionFactory) :
>         session = sessionFactory.createSession()
>         try :
>             bucket = context.getProperty("Bucket").getValue()
>             if not bucket:
>                 return
>
>             for file in ['file1', 'file2']:
>                 flowfile = session.create()
>                 if flowfile is None :
>                     return
>
>                 flowfile = session.write(flowfile,
> PyStreamCallback(bucket+'/'+file))
>                 # transfer
>                 session.transfer(flowfile, self.__rel_success)
>
>             session.commit()
>         except :
>             print sys.exc_info()[0]
>             print "Exception in TestReader:"
>             print '-' * 60
>             traceback.print_exc(file=sys.stdout)
>             print '-' * 60
>
>             session.rollback(true)
>             raise
>
> processor = ListS3Files()
>