You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by James McMahon <js...@gmail.com> on 2017/04/03 21:35:55 UTC

UndeclaredThrowableException from InvokeScriptedProcessor

Good evening. I am trying to get this simple update attribute function to
work in an InvokeScriptedProcessor ....processor. I wish to initialize
custom logging to a distinct log file specifically associated with this
processor. While updating an attribute is (should be) a very simple case,
it seems that making the ISP work with this custom directed logging is not.

I was able to get a simple filename attribute change to work, but a
particular ERROR gets thrown each and every time the ISP wkaes up and
checks for flowfiles to process. The error is this:

2017-04-03 10:59:31,370 ERROR [Timer-Driven Process Thread-4]
o.a.n.p.script.InvokeScriptedProcessor
InvokeScriptedProcessor[id=1f379.....] Unable to get property descriptors
from Processor: java.lang.reflect.UndeclaredThrowableException

I am running NiFi 0.7.1.

Either there is something obvious that I've done wrong in the following
code or I'm not using ISP properly. I did research this error and found a
link that appeared to be directly related (at Hortonworks), but
unfortunately when I incorporated the change to PySet it did not resolve my
issue. Here is that link in case you are interested:
https://community.hortonworks.com/questions/75420/invokescriptedprocessor-in-python.html

I also want to point out that I found this approach documented here, and
referenced by Matt. B:
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py

Please note that I had to recreate my code here. Typos if found are not in
the code but are introduced by my mad typing "skills" here.

Anyone encounter and fix such a problem, eliminating this error? Thanks in
advance for any help. Code shown below. -Jim

# Approach:
# First, get a simple attribute to update (DONE)
# Second, add directed logging to a specific file (getting many of the
ERRORs detailed above)
# Third, expand on the update attribute concept to select a small subset of
attributes, form them into a json object, and replace the existing flowfile
payload with that

import sys
import traceback
import logging
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
from org.python.core import PySet

class UpdateAttributes(Processor) :
     #__rel_success =
Relationship.Builder().description("Success").name("success".build()
     def __init__(self) :
          self.__rel_success =
Relationship.Builder().description("Success").name("success".build()
          self.log = None

     def initialize(self,context) :
          try :
               LOG_FILENAME='/home/nifi/latest/logs/TestLog.log'
               # Still tbd:
               # FORMAT='%(asctime)-15s %(message)s'
               # formatter = logging.Formatter('%(asctime)-15s %(message)s)'
               # self.log = context.getLogger()
               # handler = context.FileHandler(LOG_FILENAME)
               # handler.setFormatter(formatter)
               #self.addHandler(handler)
          except :
               pass

     def getRelationships(self) :
          return PySet([self.__rel_success])

     def validate(self,context) :
          return None

     def getPropertyDescriptor(self) :
          return None
          # try :
          #      descriptor =
PropertyDescriptor.Builder().name("filename").addValidators(StandardValidators.NON_EMPTY_VALIDATOR).build()
          #      return [descriptor]
          # except :
          #      pass

     def getPropertyDescriptors(self) :
          return None

     def onPropertyModified(self,descriptor,newValue,oldValue) :
          pass

     def onTrigger(self,context,sessionFactory) :
          session = sessionFactory.createSession()
          try :
               # ensure there is work to do
               flowfile = session.,get()
               if flowfile is None :
                    return

               # extract some attribute value of interest
               # fromPropertyValue =
context.getProperty("for-attributes").getValue()
               fromAttributeValue = flowfile.getAttribute("filename")

               # set our attribute to the desired new value
               # flowfile = session.putAttribute(flowfile, "from-property",
fromPropertyValue)
               flowfile = session.putAttribute(flowfile, "filename",
"LARRY_CURLEY_MOE")

               # transfer
               session.transfer(flowfile, self.__rel_success)
               session.commit()
          except:
               session.rollback()
               raise

processor = UpdateAttributes()

Fwd: Invoking a Processor through REST API

Posted by Arun Manivannan <ar...@arunma.com>.
Hi,

Good morning.


(Reposting this on dev from users because this is a better place.)

I've been using NiFi for a month now and have all good things to say about
it. Truly amazing work !! Thanks a lot!

*Gist* :  What's the correct way to invoke a NiFi flow from an external
scheduler such as Control-M (https://en.wikipedia.org/wiki/BMC_Control-M)

We have around 40 jobs (one per country) generating about 100 small/medium
sized files each (about 1-20 MB).  Each of these jobs end at different
times and they need to be moved from one Hadoop cluster to another for
further processing.  We intend to use NiFi for this purpose and we have a
very simple initial prototype for this (attached screenshot).  The reason
for not going for distcp is because we would like to add more provisions
into the flow such as reconciliation and Hive external table creation.

The last Control-M job must kick off this Flow in NiFi.  I did explore the
nifi-api (https://nifi.apache.org/docs/nifi-docs/rest-api/) but it seems
like I need to know the processor id and I am required to start and stop
the processor via the API to achieve this.

*Questions : *

1. Is there a way to have the processor running all the time and invoke it
by "name" when needed.
2. Is there a way I could use the HandleHttpRequest to activate the
ListHDFS Processor.
3. I understand that the ListHDFS has it's INPUT_FORBIDDEN and is it okay
to remove it on my local copy?  Is there a reason why it is designed that
way?
4. For all the 40 countries, is it possible to re-use the same flow and
have them parameterized on the "source cluster", "destination cluster",
"source path" and "destination path"?  Do you think there is a possible
concurrency bug waiting to happen (especially on the ListHDFS component
where I shouldn't be updating the directory while the files are being
listed. A lock, may be?).

Apologies if the questions are on a tangent.

Best Regards,
Arun

[image: Screen Shot 2017-04-04 at 7.21.43 PM.png]

Invoking a Processor through REST API

Posted by Arun Manivannan <ar...@arunma.com>.
Hi,

Good evening.

I've been using NiFi for a month now and have all good things to say about
it. Truly amazing work !! Thanks a lot!

*Gist* :  What's the correct way to invoke a NiFi flow from an external
scheduler such as Control-M (https://en.wikipedia.org/wiki/BMC_Control-M)

We have around 40 jobs (one per country) generating about 100 small/medium
sized files each (about 1-20 MB).  Each of these jobs end at different
times and they need to be moved from one Hadoop cluster to another for
further processing.  We intend to use NiFi for this purpose and we have a
very simple initial prototype for this (attached screenshot).  The reason
for not going for distcp is because we would like to add more provisions
into the flow such as reconciliation and Hive external table creation.

The last Control-M job must kick off this Flow in NiFi.  I did explore the
nifi-api (https://nifi.apache.org/docs/nifi-docs/rest-api/) but it seems
like I need to know the processor id and I am required to start and stop
the processor via the API to achieve this.

*Questions : *

1. Is there a way to have the processor running all the time and invoke it
by "name" when needed.
2. Is there a way I could use the HandleHttpRequest to activate the
ListHDFS Processor.
3. I understand that the ListHDFS has it's INPUT_FORBIDDEN and is it okay
to remove it on my local copy?  Is there a reason why it is designed that
way?
4. For all the 40 countries, is it possible to re-use the same flow and
have them parameterized on the "source cluster", "destination cluster",
"source path" and "destination path"?  Do you think there is a possible
concurrency bug waiting to happen (especially on the ListHDFS component
where I shouldn't be updating the directory while the files are being
listed. A lock, may be?).

Apologies if the questions are on a tangent.

Best Regards,
Arun

[image: Screen Shot 2017-04-04 at 7.21.43 PM.png]

Re: UndeclaredThrowableException from InvokeScriptedProcessor

Posted by James McMahon <js...@gmail.com>.
Matt, this eliminated the problem. Thanks again for your help. I didn't
realize returning None from getPropertyDescriptors(self) would throw this
error, but now I know. I added
emptyList=[]
return emptyList
I've been monitoring nifi-app.log with a tail -f.  None of these ERRORs are
posting now. Thank you sir.
Cheers,
Jim

On Mon, Apr 3, 2017 at 5:59 PM, Matt Burgess <ma...@gmail.com> wrote:

> Sorry empty List.
>
> Sent from my iPhone
>
> On Apr 3, 2017, at 5:52 PM, Matt Burgess <ma...@gmail.com> wrote:
>
> Jim,
>
> I'm not at my keyboard but I'm guessing it is a NullPointerException from
> returning None from getPropertyDescriptors(), try returning an empty PyMap
> instead.
>
> Regards,
> Matt
>
> Sent from my iPhone
>
> On Apr 3, 2017, at 5:35 PM, James McMahon <js...@gmail.com> wrote:
>
> Good evening. I am trying to get this simple update attribute function to
> work in an InvokeScriptedProcessor ....processor. I wish to initialize
> custom logging to a distinct log file specifically associated with this
> processor. While updating an attribute is (should be) a very simple case,
> it seems that making the ISP work with this custom directed logging is not.
>
> I was able to get a simple filename attribute change to work, but a
> particular ERROR gets thrown each and every time the ISP wkaes up and
> checks for flowfiles to process. The error is this:
>
> 2017-04-03 10:59:31,370 ERROR [Timer-Driven Process Thread-4]
> o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=1f379.....]
> Unable to get property descriptors from Processor: java.lang.reflect.
> UndeclaredThrowableException
>
> I am running NiFi 0.7.1.
>
> Either there is something obvious that I've done wrong in the following
> code or I'm not using ISP properly. I did research this error and found a
> link that appeared to be directly related (at Hortonworks), but
> unfortunately when I incorporated the change to PySet it did not resolve my
> issue. Here is that link in case you are interested:
> https://community.hortonworks.com/questions/75420/
> invokescriptedprocessor-in-python.html
>
> I also want to point out that I found this approach documented here, and
> referenced by Matt. B:
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-scripting-bundle/nifi-scripting-processors/src/
> test/resources/jython/test_update_attribute.py
>
> Please note that I had to recreate my code here. Typos if found are not in
> the code but are introduced by my mad typing "skills" here.
>
> Anyone encounter and fix such a problem, eliminating this error? Thanks in
> advance for any help. Code shown below. -Jim
>
> # Approach:
> # First, get a simple attribute to update (DONE)
> # Second, add directed logging to a specific file (getting many of the
> ERRORs detailed above)
> # Third, expand on the update attribute concept to select a small subset
> of attributes, form them into a json object, and replace the existing
> flowfile payload with that
>
> import sys
> import traceback
> import logging
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.apache.nifi.processor.util import StandardValidators
> from org.python.core import PySet
>
> class UpdateAttributes(Processor) :
>      #__rel_success = Relationship.Builder().description("Success").name("
> success".build()
>      def __init__(self) :
>           self.__rel_success = Relationship.Builder().
> description("Success").name("success".build()
>           self.log = None
>
>      def initialize(self,context) :
>           try :
>                LOG_FILENAME='/home/nifi/latest/logs/TestLog.log'
>                # Still tbd:
>                # FORMAT='%(asctime)-15s %(message)s'
>                # formatter = logging.Formatter('%(asctime)-15s
> %(message)s)'
>                # self.log = context.getLogger()
>                # handler = context.FileHandler(LOG_FILENAME)
>                # handler.setFormatter(formatter)
>                #self.addHandler(handler)
>           except :
>                pass
>
>      def getRelationships(self) :
>           return PySet([self.__rel_success])
>
>      def validate(self,context) :
>           return None
>
>      def getPropertyDescriptor(self) :
>           return None
>           # try :
>           #      descriptor = PropertyDescriptor.Builder().
> name("filename").addValidators(StandardValidators.NON_EMPTY_
> VALIDATOR).build()
>           #      return [descriptor]
>           # except :
>           #      pass
>
>      def getPropertyDescriptors(self) :
>           return None
>
>      def onPropertyModified(self,descriptor,newValue,oldValue) :
>           pass
>
>      def onTrigger(self,context,sessionFactory) :
>           session = sessionFactory.createSession()
>           try :
>                # ensure there is work to do
>                flowfile = session.,get()
>                if flowfile is None :
>                     return
>
>                # extract some attribute value of interest
>                # fromPropertyValue = context.getProperty("for-
> attributes").getValue()
>                fromAttributeValue = flowfile.getAttribute("filename")
>
>                # set our attribute to the desired new value
>                # flowfile = session.putAttribute(flowfile,
> "from-property", fromPropertyValue)
>                flowfile = session.putAttribute(flowfile, "filename",
> "LARRY_CURLEY_MOE")
>
>                # transfer
>                session.transfer(flowfile, self.__rel_success)
>                session.commit()
>           except:
>                session.rollback()
>                raise
>
> processor = UpdateAttributes()
>
>

Re: UndeclaredThrowableException from InvokeScriptedProcessor

Posted by Matt Burgess <ma...@gmail.com>.
Sorry empty List.

Sent from my iPhone

> On Apr 3, 2017, at 5:52 PM, Matt Burgess <ma...@gmail.com> wrote:
> 
> Jim,
> 
> I'm not at my keyboard but I'm guessing it is a NullPointerException from returning None from getPropertyDescriptors(), try returning an empty PyMap instead.
> 
> Regards,
> Matt
> 
> Sent from my iPhone
> 
>> On Apr 3, 2017, at 5:35 PM, James McMahon <js...@gmail.com> wrote:
>> 
>> Good evening. I am trying to get this simple update attribute function to work in an InvokeScriptedProcessor ....processor. I wish to initialize custom logging to a distinct log file specifically associated with this processor. While updating an attribute is (should be) a very simple case, it seems that making the ISP work with this custom directed logging is not.
>> 
>> I was able to get a simple filename attribute change to work, but a particular ERROR gets thrown each and every time the ISP wkaes up and checks for flowfiles to process. The error is this:
>> 
>> 2017-04-03 10:59:31,370 ERROR [Timer-Driven Process Thread-4] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=1f379.....] Unable to get property descriptors from Processor: java.lang.reflect.UndeclaredThrowableException
>> 
>> I am running NiFi 0.7.1.
>> 
>> Either there is something obvious that I've done wrong in the following code or I'm not using ISP properly. I did research this error and found a link that appeared to be directly related (at Hortonworks), but unfortunately when I incorporated the change to PySet it did not resolve my issue. Here is that link in case you are interested:
>> https://community.hortonworks.com/questions/75420/invokescriptedprocessor-in-python.html
>> 
>> I also want to point out that I found this approach documented here, and referenced by Matt. B:
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py
>> 
>> Please note that I had to recreate my code here. Typos if found are not in the code but are introduced by my mad typing "skills" here.
>> 
>> Anyone encounter and fix such a problem, eliminating this error? Thanks in advance for any help. Code shown below. -Jim
>> 
>> # Approach:
>> # First, get a simple attribute to update (DONE)
>> # Second, add directed logging to a specific file (getting many of the ERRORs detailed above)
>> # Third, expand on the update attribute concept to select a small subset of attributes, form them into a json object, and replace the existing flowfile payload with that
>> 
>> import sys
>> import traceback
>> import logging
>> from org.apache.nifi.processor import Processor
>> from org.apache.nifi.processor import Relationship
>> from org.apache.nifi.components import PropertyDescriptor
>> from org.apache.nifi.processor.util import StandardValidators
>> from org.python.core import PySet
>> 
>> class UpdateAttributes(Processor) :
>>      #__rel_success = Relationship.Builder().description("Success").name("success".build()
>>      def __init__(self) :
>>           self.__rel_success = Relationship.Builder().description("Success").name("success".build()
>>           self.log = None
>> 
>>      def initialize(self,context) :
>>           try :
>>                LOG_FILENAME='/home/nifi/latest/logs/TestLog.log'
>>                # Still tbd:
>>                # FORMAT='%(asctime)-15s %(message)s'
>>                # formatter = logging.Formatter('%(asctime)-15s %(message)s)'
>>                # self.log = context.getLogger()
>>                # handler = context.FileHandler(LOG_FILENAME)
>>                # handler.setFormatter(formatter)
>>                #self.addHandler(handler)
>>           except :
>>                pass
>> 
>>      def getRelationships(self) :
>>           return PySet([self.__rel_success])
>> 
>>      def validate(self,context) :
>>           return None
>> 
>>      def getPropertyDescriptor(self) :
>>           return None
>>           # try :
>>           #      descriptor = PropertyDescriptor.Builder().name("filename").addValidators(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>           #      return [descriptor]
>>           # except :
>>           #      pass
>> 
>>      def getPropertyDescriptors(self) :
>>           return None
>> 
>>      def onPropertyModified(self,descriptor,newValue,oldValue) :
>>           pass
>> 
>>      def onTrigger(self,context,sessionFactory) :
>>           session = sessionFactory.createSession()
>>           try :
>>                # ensure there is work to do
>>                flowfile = session.,get()
>>                if flowfile is None :
>>                     return
>> 
>>                # extract some attribute value of interest
>>                # fromPropertyValue = context.getProperty("for-attributes").getValue()
>>                fromAttributeValue = flowfile.getAttribute("filename")
>> 
>>                # set our attribute to the desired new value
>>                # flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue)
>>                flowfile = session.putAttribute(flowfile, "filename", "LARRY_CURLEY_MOE")
>> 
>>                # transfer
>>                session.transfer(flowfile, self.__rel_success)
>>                session.commit()
>>           except:
>>                session.rollback()
>>                raise
>> 
>> processor = UpdateAttributes()

Re: UndeclaredThrowableException from InvokeScriptedProcessor

Posted by Matt Burgess <ma...@gmail.com>.
Jim,

I'm not at my keyboard but I'm guessing it is a NullPointerException from returning None from getPropertyDescriptors(), try returning an empty PyMap instead.

Regards,
Matt

Sent from my iPhone

> On Apr 3, 2017, at 5:35 PM, James McMahon <js...@gmail.com> wrote:
> 
> Good evening. I am trying to get this simple update attribute function to work in an InvokeScriptedProcessor ....processor. I wish to initialize custom logging to a distinct log file specifically associated with this processor. While updating an attribute is (should be) a very simple case, it seems that making the ISP work with this custom directed logging is not.
> 
> I was able to get a simple filename attribute change to work, but a particular ERROR gets thrown each and every time the ISP wkaes up and checks for flowfiles to process. The error is this:
> 
> 2017-04-03 10:59:31,370 ERROR [Timer-Driven Process Thread-4] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=1f379.....] Unable to get property descriptors from Processor: java.lang.reflect.UndeclaredThrowableException
> 
> I am running NiFi 0.7.1.
> 
> Either there is something obvious that I've done wrong in the following code or I'm not using ISP properly. I did research this error and found a link that appeared to be directly related (at Hortonworks), but unfortunately when I incorporated the change to PySet it did not resolve my issue. Here is that link in case you are interested:
> https://community.hortonworks.com/questions/75420/invokescriptedprocessor-in-python.html
> 
> I also want to point out that I found this approach documented here, and referenced by Matt. B:
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py
> 
> Please note that I had to recreate my code here. Typos if found are not in the code but are introduced by my mad typing "skills" here.
> 
> Anyone encounter and fix such a problem, eliminating this error? Thanks in advance for any help. Code shown below. -Jim
> 
> # Approach:
> # First, get a simple attribute to update (DONE)
> # Second, add directed logging to a specific file (getting many of the ERRORs detailed above)
> # Third, expand on the update attribute concept to select a small subset of attributes, form them into a json object, and replace the existing flowfile payload with that
> 
> import sys
> import traceback
> import logging
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.apache.nifi.processor.util import StandardValidators
> from org.python.core import PySet
> 
> class UpdateAttributes(Processor) :
>      #__rel_success = Relationship.Builder().description("Success").name("success".build()
>      def __init__(self) :
>           self.__rel_success = Relationship.Builder().description("Success").name("success".build()
>           self.log = None
> 
>      def initialize(self,context) :
>           try :
>                LOG_FILENAME='/home/nifi/latest/logs/TestLog.log'
>                # Still tbd:
>                # FORMAT='%(asctime)-15s %(message)s'
>                # formatter = logging.Formatter('%(asctime)-15s %(message)s)'
>                # self.log = context.getLogger()
>                # handler = context.FileHandler(LOG_FILENAME)
>                # handler.setFormatter(formatter)
>                #self.addHandler(handler)
>           except :
>                pass
> 
>      def getRelationships(self) :
>           return PySet([self.__rel_success])
> 
>      def validate(self,context) :
>           return None
> 
>      def getPropertyDescriptor(self) :
>           return None
>           # try :
>           #      descriptor = PropertyDescriptor.Builder().name("filename").addValidators(StandardValidators.NON_EMPTY_VALIDATOR).build()
>           #      return [descriptor]
>           # except :
>           #      pass
> 
>      def getPropertyDescriptors(self) :
>           return None
> 
>      def onPropertyModified(self,descriptor,newValue,oldValue) :
>           pass
> 
>      def onTrigger(self,context,sessionFactory) :
>           session = sessionFactory.createSession()
>           try :
>                # ensure there is work to do
>                flowfile = session.,get()
>                if flowfile is None :
>                     return
> 
>                # extract some attribute value of interest
>                # fromPropertyValue = context.getProperty("for-attributes").getValue()
>                fromAttributeValue = flowfile.getAttribute("filename")
> 
>                # set our attribute to the desired new value
>                # flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue)
>                flowfile = session.putAttribute(flowfile, "filename", "LARRY_CURLEY_MOE")
> 
>                # transfer
>                session.transfer(flowfile, self.__rel_success)
>                session.commit()
>           except:
>                session.rollback()
>                raise
> 
> processor = UpdateAttributes()