You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by James McMahon <js...@gmail.com> on 2017/04/04 17:02:32 UTC

Multiple log entries from InvokeScriptedProcessor initialize() ?

 Good afternoon. I have been working to configure logging to a specific log
file that associates exclusively with one NiFi processor instance. I
understood from previous posts that InvokeScriptedProcessor processor lends
itself to this requirement. ISP allows for code that runs once - and only
once - when the processor is started.



In this code snippet that follows I show how I establish my logging in my
ISP initialize() method. The problem I am having is that with each
stop/restart of the processor during test and development it appears that I
am initializing new instances of my logger following each restart of the
processor. After my first run my log has

Stooges 2020…

After my second run,

Stooges 2020…

Stooges 2020…

After my third,

Stooges 2020…

Stooges 2020…

Stooges 2020…

And so on. After three runs of my ISP processor I have six log entries
rather than the three I expect.



My code below is lacking something. How do I correct this error so that I
do not output N lines to the log file on run N?



class UpdateProcessors(Processor) :



     def __init__(self) :

          self.__rel_success =
Relationship.Builder().name("success").description("Success").build()



     def initialize(self, context) :

          try :

               # create a logger for this processor…

               self.logger = logging.getLogger('nifi_ISP_1')

               self.logger.setLevel(logging.DEBUG)

               # establish a file handler…

               fh =
logging.FileHandler('/home/nifi/latest/logs/TestLog.log')

               fh.setLevel(logging.DEBUG)

               # create a formatter and associate it with our handler…

               formatter = logging.Formatter('%(asctime)-15s %(message)s')

               fh.setFromatter(formatter)

               self.logger.addHandler(fh)

               self.logger.info('Stooges 2020: Larry, Curley, Moe for
President')

          except :

               pass



     def getRelationships(self) :

.

.

.
processor = UpdateAttributes()

Re: Multiple log entries from InvokeScriptedProcessor initialize() ?

Posted by James McMahon <js...@gmail.com>.
Hi. Wanted to close the loop in case it might help someone in the future
who has a similar interest. This is how I got logging to work from an
InvokeScriptedProcessor processor to a specific log file of my choosing,
executed from a Jython script (some hybrid of java and python that I don't
yet fully understand). I'm running NiFi 0.7.x, and I'm running Python 2.6.6.



Please do take my stuff with a grain of salt. Beyond dogged persistence I
don't have much experience going for me.



I built this Frankenstein monster with:

o initial guidance from Matt B. and Joe W. of this Nifi users group,

o a NiFi example in Git (
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py
),

o this stackoverflow discussion (
http://stackoverflow.com/questions/6729268/python-logging-messages-appearing-twice
)

and

o a PySet problem solution from Michael K. available at the Hortonworks
site (
https://community.hortonworks.com/questions/75420/invokescriptedprocessor-in-python.html
).  A big thank you for your assistance.



-Jim



import sys

import traceback

import logging

from org.apache.nifi.processor import Processor

from org.apache.nifi.processor import Relationship

from org.apache.nifi.components import PropertyDescriptor

from org.apache.nifi.processor.util import StandardValidators

from org.python.core import PySet



class UpdateAttributes(Processor) :



                def __init__(self) :

                                self.__rel_success =
Relationship.Builder().name("success").description("Success").build()



                def initialize(self, context) :

                                try :

                                                # create a logger
associated with this InvokeScriptedProcessor…

                                                self.logger =
logging.getLogger('nifi_ISP_1')


self.logger.setLevel(logging.DEBUG)



                                                # DON'T create your logging
file handler here because it must be established, used,

                                                # and discarded with each
processing cycle. Else we get all sorts of wacky multiples

                                                # of outputs in our log
file.

                                except :

                                                pass



                def getRelationships(self) :

                                return PySet([self.__rel_success])



                def validate(self, context) :

                                return None



                def getPropertyDescriptor(self) :

                                return None



                def getPropertyDescriptors(self) :

                                emptyList=[]

                                return emptyList



                def onPropertyModified(self, descriptor, newValue,
oldValue) :

                                pass



                def onTrigger(self, context, sessionFactory) :

                                session = sessionFactory.createSession()

                                try :

                                                # ensure we have some work
to do (TBD: try grabbing many files at once rather than one at a time

#                                                               to optimize
throughput)

flowfile = session.get()

if flowfile is None :

return



                                                # Establish a file handler
that logs even debug messages…


fh.logging.FileHandler('/home.nifi/latest/logs/ISP.log')

                                                fh.setLevel(logging.DEBUG)

                                                # Create a formatter and
add it to the handler…

                                                formatter =
logging.Formatter('%(asctime)-15s %(message)s')

                                                fh.setFormatter(formatter)

                                                self.logger.addHandler(fh)

                                                self.logger.info('About to
process file %s',flowfile.getAttribute("filename"))



                                                # Extract an attribute of
interest…

                                                # fromPropertyValue =
context.getProperty("for-attributes").getValue()

                                                fromAttributeValue =
flowfile.getAttribute("filename")



                                                # Set the attribute to a
new value…

                                                # flowfile =
session.putAttribute(flowfile, "from-property", fromPropertyValue)

                                                flowfile =
session.putAttribute(flowfile, "filename", "Larry_Curley_Moe.txt")

                                                self.logger.info('File
renamed to %s',flowfile.getAttribute("filename"))



                                                # Remove the handler…

fh.close()

while self.logger.handlers :

                self.logger.handlers.pop()

del fh



# transfer…

session.transfer(flowfile, self.__rel_success)

session.commit()



                                except :

                                                session.rollback()

                                                raise()


processor = UpdateAttributes()

On Tue, Apr 4, 2017 at 4:23 PM, James Wing <jv...@gmail.com> wrote:

> James,
>
> I apologize, I did not absorb the fact that you are using the Python
> logging package rather than the built-in Java logging.  My advice below is
> not helpful.
>
> Are you aware that you can use the Java logging from within scripts?  I
> believe you can configure it to provide the various files you need.
>
> On Tue, Apr 4, 2017 at 11:35 AM, James McMahon <js...@gmail.com>
> wrote:
>
>> How many distinct log files will that logback approach permit me James? I
>> have six different workflow paths for which I want to log to separate and
>> distinct log files.
>>
>> From this initialization, which of my python logging commands would stay
>> in initialize() and which would move to the logback.xml?
>>               self.logger = logging.getLogger('nifi_ISP_1')
>>
>>                self.logger.setLevel(logging.DEBUG)
>>
>>                # establish a file handler…
>>
>>                fh = logging.FileHandler('/home/nif
>> i/latest/logs/TestLog.log')
>>
>>                fh.setLevel(logging.DEBUG)
>>
>>                # create a formatter and associate it with our handler…
>>
>>                formatter = logging.Formatter('%(asctime)-15s
>> %(message)s')
>>
>>                fh.setFromatter(formatter)
>>
>>                self.logger.addHandler(fh)
>>
>>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
>> President')
>>
>>
>> So the problem here is that when I Stop the processor, the file handles
>> are not eliminated? The initialize() runs at Start only, but if it has been
>> stopped and started one or more times prior it inherits all that previous
>> baggage. Is that right?
>>
>> Thanks very much.
>>
>> Jim
>>
>> On Tue, Apr 4, 2017 at 2:18 PM, James Wing <jv...@gmail.com> wrote:
>>
>>> James,
>>>
>>> I suspect your call to self.logger.addHandler(fh) is cumulatively adding
>>> to your log results as initialize() is called again.  Can you define the
>>> log file and formatting in your NiFi's conf/logback.xml (no restart
>>> required)?  Then you can safely call getLogger() and access the shared
>>> configuration.
>>>
>>>
>>> Thanks,
>>>
>>> James
>>>
>>> On Tue, Apr 4, 2017 at 10:02 AM, James McMahon <js...@gmail.com>
>>> wrote:
>>>
>>>> Good afternoon. I have been working to configure logging to a specific
>>>> log file that associates exclusively with one NiFi processor instance. I
>>>> understood from previous posts that InvokeScriptedProcessor processor lends
>>>> itself to this requirement. ISP allows for code that runs once - and only
>>>> once - when the processor is started.
>>>>
>>>>
>>>>
>>>> In this code snippet that follows I show how I establish my logging in
>>>> my ISP initialize() method. The problem I am having is that with each
>>>> stop/restart of the processor during test and development it appears that I
>>>> am initializing new instances of my logger following each restart of the
>>>> processor. After my first run my log has
>>>>
>>>> Stooges 2020…
>>>>
>>>> After my second run,
>>>>
>>>> Stooges 2020…
>>>>
>>>> Stooges 2020…
>>>>
>>>> After my third,
>>>>
>>>> Stooges 2020…
>>>>
>>>> Stooges 2020…
>>>>
>>>> Stooges 2020…
>>>>
>>>> And so on. After three runs of my ISP processor I have six log entries
>>>> rather than the three I expect.
>>>>
>>>>
>>>>
>>>> My code below is lacking something. How do I correct this error so that
>>>> I do not output N lines to the log file on run N?
>>>>
>>>>
>>>>
>>>> class UpdateProcessors(Processor) :
>>>>
>>>>
>>>>
>>>>      def __init__(self) :
>>>>
>>>>           self.__rel_success = Relationship.Builder().name("s
>>>> uccess").description("Success").build()
>>>>
>>>>
>>>>
>>>>      def initialize(self, context) :
>>>>
>>>>           try :
>>>>
>>>>                # create a logger for this processor…
>>>>
>>>>                self.logger = logging.getLogger('nifi_ISP_1')
>>>>
>>>>                self.logger.setLevel(logging.DEBUG)
>>>>
>>>>                # establish a file handler…
>>>>
>>>>                fh = logging.FileHandler('/home/nif
>>>> i/latest/logs/TestLog.log')
>>>>
>>>>                fh.setLevel(logging.DEBUG)
>>>>
>>>>                # create a formatter and associate it with our handler…
>>>>
>>>>                formatter = logging.Formatter('%(asctime)-15s
>>>> %(message)s')
>>>>
>>>>                fh.setFromatter(formatter)
>>>>
>>>>                self.logger.addHandler(fh)
>>>>
>>>>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
>>>> President')
>>>>
>>>>           except :
>>>>
>>>>                pass
>>>>
>>>>
>>>>
>>>>      def getRelationships(self) :
>>>>
>>>> .
>>>>
>>>> .
>>>>
>>>> .
>>>> processor = UpdateAttributes()
>>>>
>>>
>>>
>>
>

Re: Multiple log entries from InvokeScriptedProcessor initialize() ?

Posted by James Wing <jv...@gmail.com>.
James,

I apologize, I did not absorb the fact that you are using the Python
logging package rather than the built-in Java logging.  My advice below is
not helpful.

Are you aware that you can use the Java logging from within scripts?  I
believe you can configure it to provide the various files you need.

On Tue, Apr 4, 2017 at 11:35 AM, James McMahon <js...@gmail.com> wrote:

> How many distinct log files will that logback approach permit me James? I
> have six different workflow paths for which I want to log to separate and
> distinct log files.
>
> From this initialization, which of my python logging commands would stay
> in initialize() and which would move to the logback.xml?
>               self.logger = logging.getLogger('nifi_ISP_1')
>
>                self.logger.setLevel(logging.DEBUG)
>
>                # establish a file handler…
>
>                fh = logging.FileHandler('/home/nif
> i/latest/logs/TestLog.log')
>
>                fh.setLevel(logging.DEBUG)
>
>                # create a formatter and associate it with our handler…
>
>                formatter = logging.Formatter('%(asctime)-15s %(message)s')
>
>                fh.setFromatter(formatter)
>
>                self.logger.addHandler(fh)
>
>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
> President')
>
>
> So the problem here is that when I Stop the processor, the file handles
> are not eliminated? The initialize() runs at Start only, but if it has been
> stopped and started one or more times prior it inherits all that previous
> baggage. Is that right?
>
> Thanks very much.
>
> Jim
>
> On Tue, Apr 4, 2017 at 2:18 PM, James Wing <jv...@gmail.com> wrote:
>
>> James,
>>
>> I suspect your call to self.logger.addHandler(fh) is cumulatively adding
>> to your log results as initialize() is called again.  Can you define the
>> log file and formatting in your NiFi's conf/logback.xml (no restart
>> required)?  Then you can safely call getLogger() and access the shared
>> configuration.
>>
>>
>> Thanks,
>>
>> James
>>
>> On Tue, Apr 4, 2017 at 10:02 AM, James McMahon <js...@gmail.com>
>> wrote:
>>
>>> Good afternoon. I have been working to configure logging to a specific
>>> log file that associates exclusively with one NiFi processor instance. I
>>> understood from previous posts that InvokeScriptedProcessor processor lends
>>> itself to this requirement. ISP allows for code that runs once - and only
>>> once - when the processor is started.
>>>
>>>
>>>
>>> In this code snippet that follows I show how I establish my logging in
>>> my ISP initialize() method. The problem I am having is that with each
>>> stop/restart of the processor during test and development it appears that I
>>> am initializing new instances of my logger following each restart of the
>>> processor. After my first run my log has
>>>
>>> Stooges 2020…
>>>
>>> After my second run,
>>>
>>> Stooges 2020…
>>>
>>> Stooges 2020…
>>>
>>> After my third,
>>>
>>> Stooges 2020…
>>>
>>> Stooges 2020…
>>>
>>> Stooges 2020…
>>>
>>> And so on. After three runs of my ISP processor I have six log entries
>>> rather than the three I expect.
>>>
>>>
>>>
>>> My code below is lacking something. How do I correct this error so that
>>> I do not output N lines to the log file on run N?
>>>
>>>
>>>
>>> class UpdateProcessors(Processor) :
>>>
>>>
>>>
>>>      def __init__(self) :
>>>
>>>           self.__rel_success = Relationship.Builder().name("s
>>> uccess").description("Success").build()
>>>
>>>
>>>
>>>      def initialize(self, context) :
>>>
>>>           try :
>>>
>>>                # create a logger for this processor…
>>>
>>>                self.logger = logging.getLogger('nifi_ISP_1')
>>>
>>>                self.logger.setLevel(logging.DEBUG)
>>>
>>>                # establish a file handler…
>>>
>>>                fh = logging.FileHandler('/home/nif
>>> i/latest/logs/TestLog.log')
>>>
>>>                fh.setLevel(logging.DEBUG)
>>>
>>>                # create a formatter and associate it with our handler…
>>>
>>>                formatter = logging.Formatter('%(asctime)-15s
>>> %(message)s')
>>>
>>>                fh.setFromatter(formatter)
>>>
>>>                self.logger.addHandler(fh)
>>>
>>>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
>>> President')
>>>
>>>           except :
>>>
>>>                pass
>>>
>>>
>>>
>>>      def getRelationships(self) :
>>>
>>> .
>>>
>>> .
>>>
>>> .
>>> processor = UpdateAttributes()
>>>
>>
>>
>

Re: Multiple log entries from InvokeScriptedProcessor initialize() ?

Posted by James McMahon <js...@gmail.com>.
How many distinct log files will that logback approach permit me James? I
have six different workflow paths for which I want to log to separate and
distinct log files.

From this initialization, which of my python logging commands would stay in
initialize() and which would move to the logback.xml?
              self.logger = logging.getLogger('nifi_ISP_1')

               self.logger.setLevel(logging.DEBUG)

               # establish a file handler…

               fh = logging.FileHandler('/home/
nifi/latest/logs/TestLog.log')

               fh.setLevel(logging.DEBUG)

               # create a formatter and associate it with our handler…

               formatter = logging.Formatter('%(asctime)-15s %(message)s')

               fh.setFromatter(formatter)

               self.logger.addHandler(fh)

               self.logger.info('Stooges 2020: Larry, Curley, Moe for
President')


So the problem here is that when I Stop the processor, the file handles are
not eliminated? The initialize() runs at Start only, but if it has been
stopped and started one or more times prior it inherits all that previous
baggage. Is that right?

Thanks very much.

Jim

On Tue, Apr 4, 2017 at 2:18 PM, James Wing <jv...@gmail.com> wrote:

> James,
>
> I suspect your call to self.logger.addHandler(fh) is cumulatively adding
> to your log results as initialize() is called again.  Can you define the
> log file and formatting in your NiFi's conf/logback.xml (no restart
> required)?  Then you can safely call getLogger() and access the shared
> configuration.
>
>
> Thanks,
>
> James
>
> On Tue, Apr 4, 2017 at 10:02 AM, James McMahon <js...@gmail.com>
> wrote:
>
>> Good afternoon. I have been working to configure logging to a specific
>> log file that associates exclusively with one NiFi processor instance. I
>> understood from previous posts that InvokeScriptedProcessor processor lends
>> itself to this requirement. ISP allows for code that runs once - and only
>> once - when the processor is started.
>>
>>
>>
>> In this code snippet that follows I show how I establish my logging in my
>> ISP initialize() method. The problem I am having is that with each
>> stop/restart of the processor during test and development it appears that I
>> am initializing new instances of my logger following each restart of the
>> processor. After my first run my log has
>>
>> Stooges 2020…
>>
>> After my second run,
>>
>> Stooges 2020…
>>
>> Stooges 2020…
>>
>> After my third,
>>
>> Stooges 2020…
>>
>> Stooges 2020…
>>
>> Stooges 2020…
>>
>> And so on. After three runs of my ISP processor I have six log entries
>> rather than the three I expect.
>>
>>
>>
>> My code below is lacking something. How do I correct this error so that I
>> do not output N lines to the log file on run N?
>>
>>
>>
>> class UpdateProcessors(Processor) :
>>
>>
>>
>>      def __init__(self) :
>>
>>           self.__rel_success = Relationship.Builder().name("s
>> uccess").description("Success").build()
>>
>>
>>
>>      def initialize(self, context) :
>>
>>           try :
>>
>>                # create a logger for this processor…
>>
>>                self.logger = logging.getLogger('nifi_ISP_1')
>>
>>                self.logger.setLevel(logging.DEBUG)
>>
>>                # establish a file handler…
>>
>>                fh = logging.FileHandler('/home/nif
>> i/latest/logs/TestLog.log')
>>
>>                fh.setLevel(logging.DEBUG)
>>
>>                # create a formatter and associate it with our handler…
>>
>>                formatter = logging.Formatter('%(asctime)-15s
>> %(message)s')
>>
>>                fh.setFromatter(formatter)
>>
>>                self.logger.addHandler(fh)
>>
>>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
>> President')
>>
>>           except :
>>
>>                pass
>>
>>
>>
>>      def getRelationships(self) :
>>
>> .
>>
>> .
>>
>> .
>> processor = UpdateAttributes()
>>
>
>

Re: Multiple log entries from InvokeScriptedProcessor initialize() ?

Posted by James Wing <jv...@gmail.com>.
James,

I suspect your call to self.logger.addHandler(fh) is cumulatively adding to
your log results as initialize() is called again.  Can you define the log
file and formatting in your NiFi's conf/logback.xml (no restart required)?
Then you can safely call getLogger() and access the shared configuration.


Thanks,

James

On Tue, Apr 4, 2017 at 10:02 AM, James McMahon <js...@gmail.com> wrote:

> Good afternoon. I have been working to configure logging to a specific log
> file that associates exclusively with one NiFi processor instance. I
> understood from previous posts that InvokeScriptedProcessor processor lends
> itself to this requirement. ISP allows for code that runs once - and only
> once - when the processor is started.
>
>
>
> In this code snippet that follows I show how I establish my logging in my
> ISP initialize() method. The problem I am having is that with each
> stop/restart of the processor during test and development it appears that I
> am initializing new instances of my logger following each restart of the
> processor. After my first run my log has
>
> Stooges 2020…
>
> After my second run,
>
> Stooges 2020…
>
> Stooges 2020…
>
> After my third,
>
> Stooges 2020…
>
> Stooges 2020…
>
> Stooges 2020…
>
> And so on. After three runs of my ISP processor I have six log entries
> rather than the three I expect.
>
>
>
> My code below is lacking something. How do I correct this error so that I
> do not output N lines to the log file on run N?
>
>
>
> class UpdateProcessors(Processor) :
>
>
>
>      def __init__(self) :
>
>           self.__rel_success = Relationship.Builder().name("
> success").description("Success").build()
>
>
>
>      def initialize(self, context) :
>
>           try :
>
>                # create a logger for this processor…
>
>                self.logger = logging.getLogger('nifi_ISP_1')
>
>                self.logger.setLevel(logging.DEBUG)
>
>                # establish a file handler…
>
>                fh = logging.FileHandler('/home/
> nifi/latest/logs/TestLog.log')
>
>                fh.setLevel(logging.DEBUG)
>
>                # create a formatter and associate it with our handler…
>
>                formatter = logging.Formatter('%(asctime)-15s %(message)s')
>
>                fh.setFromatter(formatter)
>
>                self.logger.addHandler(fh)
>
>                self.logger.info('Stooges 2020: Larry, Curley, Moe for
> President')
>
>           except :
>
>                pass
>
>
>
>      def getRelationships(self) :
>
> .
>
> .
>
> .
> processor = UpdateAttributes()
>