You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Ananth G <an...@gmail.com> on 2017/12/14 10:20:46 UTC

[Discuss] Design of the python execution operator

Hello All,

I would like to submit the design for the Python execution operator before I raise the pull request so that I can refine the implementation based on feedback. Could you please provide feedback on the design if any and I will raise the PR accordingly. 

- This operator is for the JIRA ticket raised here https://issues.apache.org/jira/browse/APEXMALHAR-2260 <https://issues.apache.org/jira/browse/APEXMALHAR-2260>
- The operator embeds a python interpreter in the operator JVM process space and is not external to the JVM.
- The implementation is proposing the use of Java Embedded Python ( JEP ) given here https://github.com/ninia/jep <https://github.com/ninia/jep>
- The JEP engine is under zlib/libpng license. Since this is an approved license under https://www.apache.org/legal/resolved.html#category-a <https://www.apache.org/legal/resolved.html#category-a> I am assuming it is ok for the community to approve the inclusion of this library  
- Python integration is a messy piece due to the nature of dynamic libraries. All python libraries need to be natively installed. This also means we will not be able bundle python libraries and dependencies as part of the build into the target JVM container. Hence this operator has the current limitation of the python binaries installed through an external process on all of the YARN nodes for now.
- The JEP maven dependency jar in the POM is a JNI wrapper around the dynamic library that is installed externally to the Apex installation process on all of the YARN nodes.
- Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue in the future.
- The python operator implementation can be extended to py4J based implementation ( as opposed to in-memory model like JEP ) in the future if required be. JEP is the implementation based on an in-memory design pattern.
- The python operator allows for 4 major API patterns
    - Execute a method call by accepting parameters to pass to the interpreter
    - Execute a python script as given in a file path
    - Evaluate an expression and allows for passing of variables between the java code and the python in-memory interpreter bridge
    - A handy method wherein a series of instructions can be passed in one single java call ( executed as a sequence of python eval instructions under the hood ) 
- Automatic garbage collection of the variables that are passed from java code to the in memory python interpreter
- Support for all major python libraries. Tensorflow, Keras, Scikit, xgboost. Preliminary tests for these libraries seem to work as per code here : https://github.com/ananthc/sampleapps/tree/master/apache-apex/apexjvmpython <https://github.com/ananthc/sampleapps/tree/master/apache-apex/apexjvmpython> 
- The implementation allows for SLA based execution model. i.e. the operator is given a chance to execute the python code and if not complete within a time out, the operator code returns back null.
- A tuple that has become a straggler as per previous point will automatically be drained off to a different port so that downstream operators can still consume the straggler if they want to when the results arrive.
- Because of the nature of python being an interpreter and if a previous tuple is being still processed, there is chance of a back pressure pattern building up very quickly. Hence this operator works on the concept of a worker pool. The Python operator uses a configurable number of worker thread each of which embed the Python interpreter within their processing space. i.e. it is in fact a collection of python ink memory interpreters inside the Python operator implementation.
- The operator chooses one of the threads at runtime basing on their busy state thus allowing for back-pressure issues to be resolved automatically.
- There is a first class support for Numpy in JEP. Java arrays would be convertible to the Python Numpy arrays and vice versa and share the same memory addresses for efficiency reasons. 
- The base operator implements dynamic partitioning based on a thread starvation policy. At each checkpoint, it checks how much percentage of the requests resulted in starved threads and if the starvation exceeds a configured percentage, a new instance of the operator is provisioned for every such instance of the operator
- The operator provides the notion of a worker execution mode. There are two worker modes that are passed in each of the above calls from the user. ALL or ANY.  Because python interpreter is state based engine, a newly dynamically partitioned operator might not be in the exact state of the remaining operators. Hence the operator has this notion of worker execution mode. Any call ( any of the 4 calls mentioned above ) called with ALL execution mode will be executed on all the workers of the worker thread pool as well as the dynamically portioned instance whenever such an instance is provisioned.  
- The base operator implementation has a method that can be overridden to implement the logic that needs to be executed for each tuple. The base operator default implementation is a simple NO-OP.
- The operator automatically picks up the least busy of the thread pool worker which has JEP embedded in it to execute the call. 
- The JEP based installation will not support non Cpython modules. All of the major python libraries are cpython based and hence I believe this is of a lesser concern. If we hit a roadblock when a new python library being a non-Cpython based library needs to be run, then we could implement the ApexPythonEngine interface to something like Py4J which involves interprocess communication. 
- The python operator requires the user to set the library path java.library.path for the operator to make use of the dynamic libraries of the corresponding platform. This has to be passed in as the JVM options. Failing to do so will result in the operator failing to load the interpreter properly. 
- The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy >= 1.7 is supported.
- There is no support for virtual environments yet. In case of multiple python versions on the node, to include the right python version for the apex operator, ensure that the environment variables and the dynamic library path are set appropriately. This is a workaround and I hope APEXCORE-796 will solve this issue as well. 


Regards,
Ananth 


Re: [Discuss] Design of the python execution operator

Posted by Thomas Weise <th...@apache.org>.
Serialization and resource manager are not involved in CONTAINER_LOCAL.

However, CONTAINER_LOCAL is probably not what you want, since it would
force all partitions to run in the same JVM.

Thomas


On Fri, Dec 22, 2017 at 1:19 PM, Ananth G <an...@gmail.com> wrote:

> I guess my comment below regarding overhead of serialisation in container
> local is wrong ? Nevertheless having a local thread implementation gives
> some benefits . For example I am using to whether sleep if there is no
> request in the queue or spin checking for request presence in the request
> queue etc to take care of no delays in the request queue processing itself.
>
> Regards,
> Ananth
>
> > On 23 Dec 2017, at 6:50 am, Ananth G <an...@gmail.com> wrote:
> >
> >
> > Thanks for the comments Thomas and Pramod.
> >
> > Apologies for the delayed response on this thread.
> >
> > > I believe the thread implementation still adds some value over a
> container local approach. It is more of a “thread local” equivalent which
> is more efficient as opposed to a container local implementation. Also the
> number of worker threads is configurable. Setting the value of 1 will let
> the user to not do this ( although I do not see a reason for why not ).
> There is always the over head of serialise/de-serialize cycle even for a
> container local approach and there is the additional possibility of
> container local not being honoured by the Resource manager based on the
> state of the resources.
> >
> > > Regarding the configurable key to ensure all tuples in a window are
> processed, I am adding a switch which can let the user choose ( and javadoc
> that clearly points out issues if not waiting for the tuples to be
> completely processed ). There are pros and cons for this and letting the
> user decide might be a better approach. The reason why I mention cons for
> waiting the tuples to complete ( apart from the reason that Thomas
> mentioned ) is that if one of the commands that the user wrote is an
> erroneous one, all the subsequent calls to that interpreter thread cal
> fail. An example use case is that tuple A set some value for variable x and
> tuple B that is coming next is making use of the variable x. Syntactically
> expression for tuple B is valid but just that it depends on variable x. Now
> if the variable x is not in memory because tuple A is a straggler resulting
> in tuple B resulting in an erroneous interpreter state. Hence the operator
> might stall definitely as end window will be stalled forever resulting in
> killing of the operator ultimately. This is also because the erroneous
> command corrupted the state of the interpreter itself. Of course this can
> happen to all of the threads in the interpreter worker pool resulting in
> this state as well. Perhaps an improvement of the current implementation is
> to detect all such stalled interpreters for more than x windows and rebuild
> the interpreter thread when such a situation is detected.
> >
> > > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that
> the stragglers are drained out irrespective of a new tuple coming in for
> processing. In the previous iteration, the stragglers could only be drained
> when there is a new tuple that came in processing as delayed responses
> queue could only be checked when there is some activity on the main thread.
> >
> > > Thanks for raising the point about the virtual environments: This is a
> point I missed mentioning in the design description below. There is no
> support for virtual environments yet in JEP and hence the current
> limitation. However the work around is simple. As part of the application
> configuration, we need to provide the JAVA_LIBRARY_PATH which contains the
> path to the JEP dynamic libraries. If there are multiple python installs (
> and hence multiple JEP libraries to choose from for each of the apex
> applications that are being deployed), setting the right path for the
> operator JVM will result in picking the corresponding python interpreter
> version. This also essentially means that we cannot have a thread local
> deployment configuration of two python operators that belong to different
> python versions in the same JVM.  The Docker approach ticket should be the
> right fix for virtual environments issue? https://issues.apache.org/
> jira/browse/APEXCORE-796 <https://issues.apache.org/
> jira/browse/APEXCORE-796> ( but still might not solve the thread local
> configuration deployment )
> >
> > Regards,
> > Ananth
> >
> >
> >> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pramod@datatorrent.com
> <ma...@datatorrent.com>> wrote:
> >>
> >> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <thw@apache.org <mailto:
> thw@apache.org>> wrote:
> >>
> >>> It is exciting to see this move forward, the ability to use Python
> opens
> >>> many new possibilities.
> >>>
> >>> Regarding use of worker threads, this is a pattern that we are using
> >>> elsewhere (for example in the Kafka input operator). When the operator
> >>> performs blocking operations and consumes little memory and/or CPU,
> then it
> >>> is more economic to first use threads to increase parallelism and
> >>> throughput (up to a limit), and then the more expensive containers for
> >>> horizontal scaling (multiple threads to make good use of container
> >>> resources and then scale using the usual partitioning).
> >>>
> >>
> >> I think there is a difference. In case of kafka or other input operators
> >> the threads are less constrained. They can operate with independence and
> >> can dictate the pace limited only by back pressure. In this case the
> >> operator is most likely going to be downsteram in the DAG and would have
> >> constraints for processing guarantees. For scalability, container local
> >> could also be used as a substitue for multiple threads without
> resorting to
> >> using separate containers. I can understand use of a separate thread to
> be
> >> able to get around problems like stalled processing but would first try
> to
> >> see if something like container local would work for scaling.
> >>
> >>
> >>> It is also correct that generally there is no ordering guarantee
> within a
> >>> streaming window, and that would be the case when multiple input ports
> are
> >>> present as well. (The platform cannot guarantee such ordering, this
> would
> >>> need to be done by the operator).
> >>
> >>
> >>
> >>> Idempotency can be expensive (latency and/or space complexity), and
> not all
> >>> applications need it (like certain use cases that process record by
> record
> >>> and don't accumulate state). An example might be Python logic that is
> used
> >>> for scoring against a model that was built offline. Idempotency would
> >>> actually be rather difficult to implement, since the operator would
> need to
> >>> remember which tuples were emitted in a given interval and on replay
> block
> >>> until they are available (and also hold others that may be processed
> sooner
> >>> than in the original order). It may be easier to record emitted tuples
> to a
> >>> WAL instead of reprocessing.
> >>>
> >>
> >> Ordering cannot be guaranteed but the operator would need to finish the
> >> work it is given a window within the window boundary, otherwise there
> is a
> >> chance for data loss in recovery scenarios. You could make checkpoint
> the
> >> boundary by which all pending work is completed instead of every window
> >> boundary but then downstream operators cannot rely on window level
> >> idempotency for exactly once. Something like file output operator would
> >> work but not our db kind of operator. Both options could be supported in
> >> the operator.
> >>
> >>
> >>> Regarding not emitting stragglers until the next input arrives, can
> this
> >>> not be accomplished using IdleTimeHandler?
> >>>
> >>> What is preventing the use of virtual environments?
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>>
> >>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <
> pramod@datatorrent.com <ma...@datatorrent.com>>
> >>> wrote:
> >>>
> >>>> Hi Ananth,
> >>>>
> >>>> From your explanation, it looks like the threads overall allow you to
> >>>> achieve two things. Have some sort of overall timeout if by which a
> tuple
> >>>> doesn't finish processing then it is flagged as such. Second, it
> doesn't
> >>>> block processing of subsequent tuples and you can still process them
> >>>> meeting the SLA. By checkpoint, however, I think you should try to
> have a
> >>>> resolution one way or the other for all the tuples received within the
> >>>> checkpoint period or every window boundary (see idempotency below),
> >>>> otherwise, there is a chance of data loss in case of operator
> restarts.
> >>> If
> >>>> a loss is acceptable for stragglers you could let straggler processing
> >>>> continue beyond checkpoint boundary and let them finish when they can.
> >>> You
> >>>> could support both behaviors by use of a property. Furthermore, you
> may
> >>> not
> >>>> want all threads stuck with stragglers and then you are back to square
> >>> one
> >>>> so you may need to stop processing stragglers beyond a certain thread
> >>> usage
> >>>> threshold. Is there a way to interrupt the processing of the engine?
> >>>>
> >>>> Then there is the question of idempotency. I suspect it would be
> >>> difficult
> >>>> to maintain it unless you wait for processing to finish for all tuples
> >>>> received during the window every window boundary. You may provide an
> >>> option
> >>>> for relaxing the strict guarantees for the stragglers like mentioned
> >>> above.
> >>>>
> >>>> Pramod
> >>>>
> >>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.apex@gmail.com
> <ma...@gmail.com>>
> >>> wrote:
> >>>>
> >>>>> Hello Pramod,
> >>>>>
> >>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is
> >>> what I
> >>>>> was thinking for the worker pool implementation.
> >>>>>
> >>>>> - The main reason ( which I forgot to mention in the design points
> >>> below
> >>>> )
> >>>>> is that the java embedded engine allows only the thread that created
> >>> the
> >>>>> instance to execute the python logic. This is more because of the JNI
> >>>>> specification itself. Some hints here https://stackoverflow.com/ <
> https://stackoverflow.com/>
> >>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> >>>>> https://stackoverflow.com/questions/18056347/jni- <
> https://stackoverflow.com/questions/18056347/jni->
> >>>> calling-java-from-c-with-
> >>>>> multiple-threads> and here http://journals.ecs.soton.ac <
> http://journals.ecs.soton.ac/>.
> >>>>> uk/java/tutorial/native1.1/implementing/sync.html <
> >>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/ <
> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/>
> >>>>> implementing/sync.html>
> >>>>>
> >>>>> - This essentially means that the main operator thread will have to
> >>> call
> >>>>> the python code execution logic if the design were otherwise.
> >>>>>
> >>>>> - Since the end user can choose to can write any kind of logic
> >>> including
> >>>>> blocking I/O as part of the implementation, I did not want to stall
> the
> >>>>> operator thread for any usage pattern.
> >>>>>
> >>>>> - In fact there is only one overall interpreter in the JVM process
> >>> space
> >>>>> and the interpreter thread is just a JNI wrapper around it to account
> >>> for
> >>>>> the JNI limitations above.
> >>>>>
> >>>>> - It is for the very same reason, there is an API in the
> implementation
> >>>> to
> >>>>> support for registering Shared Modules across all of the interpreter
> >>>>> threads. Use cases for this exist when there is a global variable
> >>>> provided
> >>>>> by the underlying Python library and loading it multiple times can
> >>> cause
> >>>>> issues. Hence the API to register a shared module which can be used
> by
> >>>> all
> >>>>> of the Interpreter Threads.
> >>>>>
> >>>>> - The operator submits to a work request queue and consumes from a
> >>>>> response queue for each of the interpreter thread. There exists one
> >>>> request
> >>>>> and one response queue per interpreter thread.
> >>>>>
> >>>>> - The stragglers will get drained from the response queue for a
> >>>> previously
> >>>>> submitted request queue.
> >>>>>
> >>>>> - The other reason why I chose to implement it this ways is also for
> >>> some
> >>>>> of the use case that I foresee in the ML scoring scenarios. In fraud
> >>>>> systems, if I have a strict SLA to score a model, the main thread in
> >>> the
> >>>>> operator is not helping me implement this pattern at all. The caller
> to
> >>>> the
> >>>>> Apex application will need to proceed if the scoring gets delayed due
> >>> to
> >>>>> whatever reason. However the scoring can continue on the interpreter
> >>>> thread
> >>>>> and can be drained later ( It is just that the caller did not make
> use
> >>> of
> >>>>> this result but can still be persisted for operators consuming from
> the
> >>>>> straggler port.
> >>>>>
> >>>>> - There are 3 output ports for this operator. DefaultOutputPort,
> >>>>> stragglersPort and an errorPort.
> >>>>>
> >>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow
> >>>>> models can execute a tensorflow DAG as part of a model scoring
> >>>>> implementation and hence I wanted to take the approach of a worker
> >>> pool.
> >>>>> Yes your point is valid if we wait for the stragglers to complete in
> a
> >>>>> given window. The current implementation does not force to wait for
> all
> >>>> of
> >>>>> the stragglers to complete. The stragglers are emitted only when
> there
> >>>> is a
> >>>>> new tuple that is being processed. i.e. when a new tuple arrives for
> >>>>> scoring , the straggler response queue is checked if there are any
> >>>> entries
> >>>>> and if yes, the responses are emitted into the stragglerPort. This
> >>>>> essentially means that there are situations when the straggler port
> is
> >>>>> emitting the result for a request submitted in the previous window.
> >>> This
> >>>>> also implies that idempotency cannot be guaranteed across runs of the
> >>>> same
> >>>>> input data. In fact all threaded implementations have this issue as
> >>>>> ordering of the results is not guaranteed to be unique even within a
> >>>> given
> >>>>> window ?
> >>>>>
> >>>>> I can enforce a block/drain at the end of the window to force a
> >>>> completion
> >>>>> basing on the feedback.
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>> Ananth
> >>>>>
> >>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pramod@datatorrent.com
> <ma...@datatorrent.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Anath,
> >>>>>>
> >>>>>> Sounds interesting and looks like you have put quite a bit of work
> on
> >>>> it.
> >>>>>> Might I suggest changing the title of 2260 to better fit your
> >>> proposal
> >>>>> and
> >>>>>> implementation, mainly so that there is differentiation from 2261.
> >>>>>>
> >>>>>> I wanted to discuss the proposal to use multiple threads in an
> >>> operator
> >>>>>> instance. Unless the execution threads are blocking for some sort of
> >>>> i/o
> >>>>>> why would it result in a noticeable performance difference compared
> >>> to
> >>>>>> processing in operator thread and running multiple partitions of the
> >>>>>> operator in container local. By running the processing in a separate
> >>>>> thread
> >>>>>> from the operator lifecycle thread you don't still get away from
> >>>> matching
> >>>>>> the incoming data throughput. The checkpoint will act as a time
> where
> >>>> you
> >>>>>> backpressure will start to materialize when the operator would have
> >>> to
> >>>>> wait
> >>>>>> for your background processing to complete to guarantee all data
> till
> >>>> the
> >>>>>> checkpoint is processed.
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.apex@gmail.com
> <ma...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello All,
> >>>>>>>
> >>>>>>> I would like to submit the design for the Python execution operator
> >>>>> before
> >>>>>>> I raise the pull request so that I can refine the implementation
> >>> based
> >>>>> on
> >>>>>>> feedback. Could you please provide feedback on the design if any
> >>> and I
> >>>>> will
> >>>>>>> raise the PR accordingly.
> >>>>>>>
> >>>>>>> - This operator is for the JIRA ticket raised here
> >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260> <
> >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260>>
> >>>>>>> - The operator embeds a python interpreter in the operator JVM
> >>> process
> >>>>>>> space and is not external to the JVM.
> >>>>>>> - The implementation is proposing the use of Java Embedded Python (
> >>>> JEP
> >>>>> )
> >>>>>>> given here https://github.com/ninia/jep <
> https://github.com/ninia/jep> <
> >>> https://github.com/ninia/jep <https://github.com/ninia/jep>
> >>>>>
> >>>>>>> - The JEP engine is under zlib/libpng license. Since this is an
> >>>> approved
> >>>>>>> license under https://www.apache.org/legal/
> resolved.html#category-a
> >>> <
> >>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am
> >>> assuming
> >>>> it
> >>>>>>> is ok for the community to approve the inclusion of this library
> >>>>>>> - Python integration is a messy piece due to the nature of dynamic
> >>>>>>> libraries. All python libraries need to be natively installed. This
> >>>> also
> >>>>>>> means we will not be able bundle python libraries and dependencies
> >>> as
> >>>>> part
> >>>>>>> of the build into the target JVM container. Hence this operator has
> >>>> the
> >>>>>>> current limitation of the python binaries installed through an
> >>>> external
> >>>>>>> process on all of the YARN nodes for now.
> >>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around
> >>> the
> >>>>>>> dynamic library that is installed externally to the Apex
> >>> installation
> >>>>>>> process on all of the YARN nodes.
> >>>>>>> - Hope to take up https://issues.apache.org/
> >>> jira/browse/APEXCORE-796
> >>>> <
> >>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
> >>>> issue
> >>>>>>> in the future.
> >>>>>>> - The python operator implementation can be extended to py4J based
> >>>>>>> implementation ( as opposed to in-memory model like JEP ) in the
> >>>> future
> >>>>> if
> >>>>>>> required be. JEP is the implementation based on an in-memory design
> >>>>> pattern.
> >>>>>>> - The python operator allows for 4 major API patterns
> >>>>>>>   - Execute a method call by accepting parameters to pass to the
> >>>>>>> interpreter
> >>>>>>>   - Execute a python script as given in a file path
> >>>>>>>   - Evaluate an expression and allows for passing of variables
> >>>> between
> >>>>>>> the java code and the python in-memory interpreter bridge
> >>>>>>>   - A handy method wherein a series of instructions can be passed
> >>> in
> >>>>> one
> >>>>>>> single java call ( executed as a sequence of python eval
> >>> instructions
> >>>>> under
> >>>>>>> the hood )
> >>>>>>> - Automatic garbage collection of the variables that are passed
> from
> >>>>> java
> >>>>>>> code to the in memory python interpreter
> >>>>>>> - Support for all major python libraries. Tensorflow, Keras,
> Scikit,
> >>>>>>> xgboost. Preliminary tests for these libraries seem to work as per
> >>>> code
> >>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> >>>>>>> apex/apexjvmpython <https://github.com/ananthc/
> >>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython>
> >>>>>>> - The implementation allows for SLA based execution model. i.e. the
> >>>>>>> operator is given a chance to execute the python code and if not
> >>>>> complete
> >>>>>>> within a time out, the operator code returns back null.
> >>>>>>> - A tuple that has become a straggler as per previous point will
> >>>>>>> automatically be drained off to a different port so that downstream
> >>>>>>> operators can still consume the straggler if they want to when the
> >>>>> results
> >>>>>>> arrive.
> >>>>>>> - Because of the nature of python being an interpreter and if a
> >>>> previous
> >>>>>>> tuple is being still processed, there is chance of a back pressure
> >>>>> pattern
> >>>>>>> building up very quickly. Hence this operator works on the concept
> >>> of
> >>>> a
> >>>>>>> worker pool. The Python operator uses a configurable number of
> >>> worker
> >>>>>>> thread each of which embed the Python interpreter within their
> >>>>> processing
> >>>>>>> space. i.e. it is in fact a collection of python ink memory
> >>>> interpreters
> >>>>>>> inside the Python operator implementation.
> >>>>>>> - The operator chooses one of the threads at runtime basing on
> their
> >>>>> busy
> >>>>>>> state thus allowing for back-pressure issues to be resolved
> >>>>> automatically.
> >>>>>>> - There is a first class support for Numpy in JEP. Java arrays
> would
> >>>> be
> >>>>>>> convertible to the Python Numpy arrays and vice versa and share the
> >>>> same
> >>>>>>> memory addresses for efficiency reasons.
> >>>>>>> - The base operator implements dynamic partitioning based on a
> >>> thread
> >>>>>>> starvation policy. At each checkpoint, it checks how much
> percentage
> >>>> of
> >>>>> the
> >>>>>>> requests resulted in starved threads and if the starvation exceeds
> a
> >>>>>>> configured percentage, a new instance of the operator is
> provisioned
> >>>> for
> >>>>>>> every such instance of the operator
> >>>>>>> - The operator provides the notion of a worker execution mode.
> There
> >>>> are
> >>>>>>> two worker modes that are passed in each of the above calls from
> the
> >>>>> user.
> >>>>>>> ALL or ANY.  Because python interpreter is state based engine, a
> >>> newly
> >>>>>>> dynamically partitioned operator might not be in the exact state of
> >>>> the
> >>>>>>> remaining operators. Hence the operator has this notion of worker
> >>>>> execution
> >>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with
> >>> ALL
> >>>>>>> execution mode will be executed on all the workers of the worker
> >>>> thread
> >>>>>>> pool as well as the dynamically portioned instance whenever such an
> >>>>>>> instance is provisioned.
> >>>>>>> - The base operator implementation has a method that can be
> >>> overridden
> >>>>> to
> >>>>>>> implement the logic that needs to be executed for each tuple. The
> >>> base
> >>>>>>> operator default implementation is a simple NO-OP.
> >>>>>>> - The operator automatically picks up the least busy of the thread
> >>>> pool
> >>>>>>> worker which has JEP embedded in it to execute the call.
> >>>>>>> - The JEP based installation will not support non Cpython modules.
> >>> All
> >>>>> of
> >>>>>>> the major python libraries are cpython based and hence I believe
> >>> this
> >>>>> is of
> >>>>>>> a lesser concern. If we hit a roadblock when a new python library
> >>>> being
> >>>>> a
> >>>>>>> non-Cpython based library needs to be run, then we could implement
> >>> the
> >>>>>>> ApexPythonEngine interface to something like Py4J which involves
> >>>>>>> interprocess communication.
> >>>>>>> - The python operator requires the user to set the library path
> >>>>>>> java.library.path for the operator to make use of the dynamic
> >>>> libraries
> >>>>> of
> >>>>>>> the corresponding platform. This has to be passed in as the JVM
> >>>> options.
> >>>>>>> Failing to do so will result in the operator failing to load the
> >>>>>>> interpreter properly.
> >>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
> >>>> Numpy
> >>>>>> =
> >>>>>>> 1.7 is supported.
> >>>>>>> - There is no support for virtual environments yet. In case of
> >>>> multiple
> >>>>>>> python versions on the node, to include the right python version
> for
> >>>> the
> >>>>>>> apex operator, ensure that the environment variables and the
> dynamic
> >>>>>>> library path are set appropriately. This is a workaround and I hope
> >>>>>>> APEXCORE-796 will solve this issue as well.
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Ananth
> >
>
>

Re: [Discuss] Design of the python execution operator

Posted by Ananth G <an...@gmail.com>.
I guess my comment below regarding overhead of serialisation in container local is wrong ? Nevertheless having a local thread implementation gives some benefits . For example I am using to whether sleep if there is no request in the queue or spin checking for request presence in the request queue etc to take care of no delays in the request queue processing itself.

Regards,
Ananth

> On 23 Dec 2017, at 6:50 am, Ananth G <an...@gmail.com> wrote:
> 
> 
> Thanks for the comments Thomas and Pramod. 
> 
> Apologies for the delayed response on this thread. 
> 
> > I believe the thread implementation still adds some value over a container local approach. It is more of a “thread local” equivalent which is more efficient as opposed to a container local implementation. Also the number of worker threads is configurable. Setting the value of 1 will let the user to not do this ( although I do not see a reason for why not ). There is always the over head of serialise/de-serialize cycle even for a container local approach and there is the additional possibility of container local not being honoured by the Resource manager based on the state of the resources. 
> 
> > Regarding the configurable key to ensure all tuples in a window are processed, I am adding a switch which can let the user choose ( and javadoc that clearly points out issues if not waiting for the tuples to be completely processed ). There are pros and cons for this and letting the user decide might be a better approach. The reason why I mention cons for waiting the tuples to complete ( apart from the reason that Thomas mentioned ) is that if one of the commands that the user wrote is an erroneous one, all the subsequent calls to that interpreter thread cal fail. An example use case is that tuple A set some value for variable x and tuple B that is coming next is making use of the variable x. Syntactically expression for tuple B is valid but just that it depends on variable x. Now if the variable x is not in memory because tuple A is a straggler resulting in tuple B resulting in an erroneous interpreter state. Hence the operator might stall definitely as end window will be stalled forever resulting in killing of the operator ultimately. This is also because the erroneous command corrupted the state of the interpreter itself. Of course this can happen to all of the threads in the interpreter worker pool resulting in this state as well. Perhaps an improvement of the current implementation is to detect all such stalled interpreters for more than x windows and rebuild the interpreter thread when such a situation is detected. 
> 
> > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the stragglers are drained out irrespective of a new tuple coming in for processing. In the previous iteration, the stragglers could only be drained when there is a new tuple that came in processing as delayed responses queue could only be checked when there is some activity on the main thread. 
> 
> > Thanks for raising the point about the virtual environments: This is a point I missed mentioning in the design description below. There is no support for virtual environments yet in JEP and hence the current limitation. However the work around is simple. As part of the application configuration, we need to provide the JAVA_LIBRARY_PATH which contains the path to the JEP dynamic libraries. If there are multiple python installs ( and hence multiple JEP libraries to choose from for each of the apex applications that are being deployed), setting the right path for the operator JVM will result in picking the corresponding python interpreter version. This also essentially means that we cannot have a thread local deployment configuration of two python operators that belong to different python versions in the same JVM.  The Docker approach ticket should be the right fix for virtual environments issue? https://issues.apache.org/jira/browse/APEXCORE-796 <https://issues.apache.org/jira/browse/APEXCORE-796> ( but still might not solve the thread local configuration deployment )
> 
> Regards,
> Ananth
> 
> 
>> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pramod@datatorrent.com <ma...@datatorrent.com>> wrote:
>> 
>> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <thw@apache.org <ma...@apache.org>> wrote:
>> 
>>> It is exciting to see this move forward, the ability to use Python opens
>>> many new possibilities.
>>> 
>>> Regarding use of worker threads, this is a pattern that we are using
>>> elsewhere (for example in the Kafka input operator). When the operator
>>> performs blocking operations and consumes little memory and/or CPU, then it
>>> is more economic to first use threads to increase parallelism and
>>> throughput (up to a limit), and then the more expensive containers for
>>> horizontal scaling (multiple threads to make good use of container
>>> resources and then scale using the usual partitioning).
>>> 
>> 
>> I think there is a difference. In case of kafka or other input operators
>> the threads are less constrained. They can operate with independence and
>> can dictate the pace limited only by back pressure. In this case the
>> operator is most likely going to be downsteram in the DAG and would have
>> constraints for processing guarantees. For scalability, container local
>> could also be used as a substitue for multiple threads without resorting to
>> using separate containers. I can understand use of a separate thread to be
>> able to get around problems like stalled processing but would first try to
>> see if something like container local would work for scaling.
>> 
>> 
>>> It is also correct that generally there is no ordering guarantee within a
>>> streaming window, and that would be the case when multiple input ports are
>>> present as well. (The platform cannot guarantee such ordering, this would
>>> need to be done by the operator).
>> 
>> 
>> 
>>> Idempotency can be expensive (latency and/or space complexity), and not all
>>> applications need it (like certain use cases that process record by record
>>> and don't accumulate state). An example might be Python logic that is used
>>> for scoring against a model that was built offline. Idempotency would
>>> actually be rather difficult to implement, since the operator would need to
>>> remember which tuples were emitted in a given interval and on replay block
>>> until they are available (and also hold others that may be processed sooner
>>> than in the original order). It may be easier to record emitted tuples to a
>>> WAL instead of reprocessing.
>>> 
>> 
>> Ordering cannot be guaranteed but the operator would need to finish the
>> work it is given a window within the window boundary, otherwise there is a
>> chance for data loss in recovery scenarios. You could make checkpoint the
>> boundary by which all pending work is completed instead of every window
>> boundary but then downstream operators cannot rely on window level
>> idempotency for exactly once. Something like file output operator would
>> work but not our db kind of operator. Both options could be supported in
>> the operator.
>> 
>> 
>>> Regarding not emitting stragglers until the next input arrives, can this
>>> not be accomplished using IdleTimeHandler?
>>> 
>>> What is preventing the use of virtual environments?
>>> 
>>> Thanks,
>>> Thomas
>>> 
>>> 
>>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pramod@datatorrent.com <ma...@datatorrent.com>>
>>> wrote:
>>> 
>>>> Hi Ananth,
>>>> 
>>>> From your explanation, it looks like the threads overall allow you to
>>>> achieve two things. Have some sort of overall timeout if by which a tuple
>>>> doesn't finish processing then it is flagged as such. Second, it doesn't
>>>> block processing of subsequent tuples and you can still process them
>>>> meeting the SLA. By checkpoint, however, I think you should try to have a
>>>> resolution one way or the other for all the tuples received within the
>>>> checkpoint period or every window boundary (see idempotency below),
>>>> otherwise, there is a chance of data loss in case of operator restarts.
>>> If
>>>> a loss is acceptable for stragglers you could let straggler processing
>>>> continue beyond checkpoint boundary and let them finish when they can.
>>> You
>>>> could support both behaviors by use of a property. Furthermore, you may
>>> not
>>>> want all threads stuck with stragglers and then you are back to square
>>> one
>>>> so you may need to stop processing stragglers beyond a certain thread
>>> usage
>>>> threshold. Is there a way to interrupt the processing of the engine?
>>>> 
>>>> Then there is the question of idempotency. I suspect it would be
>>> difficult
>>>> to maintain it unless you wait for processing to finish for all tuples
>>>> received during the window every window boundary. You may provide an
>>> option
>>>> for relaxing the strict guarantees for the stragglers like mentioned
>>> above.
>>>> 
>>>> Pramod
>>>> 
>>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.apex@gmail.com <ma...@gmail.com>>
>>> wrote:
>>>> 
>>>>> Hello Pramod,
>>>>> 
>>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is
>>> what I
>>>>> was thinking for the worker pool implementation.
>>>>> 
>>>>> - The main reason ( which I forgot to mention in the design points
>>> below
>>>> )
>>>>> is that the java embedded engine allows only the thread that created
>>> the
>>>>> instance to execute the python logic. This is more because of the JNI
>>>>> specification itself. Some hints here https://stackoverflow.com/ <https://stackoverflow.com/>
>>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
>>>>> https://stackoverflow.com/questions/18056347/jni- <https://stackoverflow.com/questions/18056347/jni->
>>>> calling-java-from-c-with-
>>>>> multiple-threads> and here http://journals.ecs.soton.ac <http://journals.ecs.soton.ac/>.
>>>>> uk/java/tutorial/native1.1/implementing/sync.html <
>>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/ <http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/>
>>>>> implementing/sync.html>
>>>>> 
>>>>> - This essentially means that the main operator thread will have to
>>> call
>>>>> the python code execution logic if the design were otherwise.
>>>>> 
>>>>> - Since the end user can choose to can write any kind of logic
>>> including
>>>>> blocking I/O as part of the implementation, I did not want to stall the
>>>>> operator thread for any usage pattern.
>>>>> 
>>>>> - In fact there is only one overall interpreter in the JVM process
>>> space
>>>>> and the interpreter thread is just a JNI wrapper around it to account
>>> for
>>>>> the JNI limitations above.
>>>>> 
>>>>> - It is for the very same reason, there is an API in the implementation
>>>> to
>>>>> support for registering Shared Modules across all of the interpreter
>>>>> threads. Use cases for this exist when there is a global variable
>>>> provided
>>>>> by the underlying Python library and loading it multiple times can
>>> cause
>>>>> issues. Hence the API to register a shared module which can be used by
>>>> all
>>>>> of the Interpreter Threads.
>>>>> 
>>>>> - The operator submits to a work request queue and consumes from a
>>>>> response queue for each of the interpreter thread. There exists one
>>>> request
>>>>> and one response queue per interpreter thread.
>>>>> 
>>>>> - The stragglers will get drained from the response queue for a
>>>> previously
>>>>> submitted request queue.
>>>>> 
>>>>> - The other reason why I chose to implement it this ways is also for
>>> some
>>>>> of the use case that I foresee in the ML scoring scenarios. In fraud
>>>>> systems, if I have a strict SLA to score a model, the main thread in
>>> the
>>>>> operator is not helping me implement this pattern at all. The caller to
>>>> the
>>>>> Apex application will need to proceed if the scoring gets delayed due
>>> to
>>>>> whatever reason. However the scoring can continue on the interpreter
>>>> thread
>>>>> and can be drained later ( It is just that the caller did not make use
>>> of
>>>>> this result but can still be persisted for operators consuming from the
>>>>> straggler port.
>>>>> 
>>>>> - There are 3 output ports for this operator. DefaultOutputPort,
>>>>> stragglersPort and an errorPort.
>>>>> 
>>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow
>>>>> models can execute a tensorflow DAG as part of a model scoring
>>>>> implementation and hence I wanted to take the approach of a worker
>>> pool.
>>>>> Yes your point is valid if we wait for the stragglers to complete in a
>>>>> given window. The current implementation does not force to wait for all
>>>> of
>>>>> the stragglers to complete. The stragglers are emitted only when there
>>>> is a
>>>>> new tuple that is being processed. i.e. when a new tuple arrives for
>>>>> scoring , the straggler response queue is checked if there are any
>>>> entries
>>>>> and if yes, the responses are emitted into the stragglerPort. This
>>>>> essentially means that there are situations when the straggler port is
>>>>> emitting the result for a request submitted in the previous window.
>>> This
>>>>> also implies that idempotency cannot be guaranteed across runs of the
>>>> same
>>>>> input data. In fact all threaded implementations have this issue as
>>>>> ordering of the results is not guaranteed to be unique even within a
>>>> given
>>>>> window ?
>>>>> 
>>>>> I can enforce a block/drain at the end of the window to force a
>>>> completion
>>>>> basing on the feedback.
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Ananth
>>>>> 
>>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pramod@datatorrent.com <ma...@datatorrent.com>>
>>>>> wrote:
>>>>>> 
>>>>>> Hi Anath,
>>>>>> 
>>>>>> Sounds interesting and looks like you have put quite a bit of work on
>>>> it.
>>>>>> Might I suggest changing the title of 2260 to better fit your
>>> proposal
>>>>> and
>>>>>> implementation, mainly so that there is differentiation from 2261.
>>>>>> 
>>>>>> I wanted to discuss the proposal to use multiple threads in an
>>> operator
>>>>>> instance. Unless the execution threads are blocking for some sort of
>>>> i/o
>>>>>> why would it result in a noticeable performance difference compared
>>> to
>>>>>> processing in operator thread and running multiple partitions of the
>>>>>> operator in container local. By running the processing in a separate
>>>>> thread
>>>>>> from the operator lifecycle thread you don't still get away from
>>>> matching
>>>>>> the incoming data throughput. The checkpoint will act as a time where
>>>> you
>>>>>> backpressure will start to materialize when the operator would have
>>> to
>>>>> wait
>>>>>> for your background processing to complete to guarantee all data till
>>>> the
>>>>>> checkpoint is processed.
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>> 
>>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.apex@gmail.com <ma...@gmail.com>>
>>>>> wrote:
>>>>>> 
>>>>>>> Hello All,
>>>>>>> 
>>>>>>> I would like to submit the design for the Python execution operator
>>>>> before
>>>>>>> I raise the pull request so that I can refine the implementation
>>> based
>>>>> on
>>>>>>> feedback. Could you please provide feedback on the design if any
>>> and I
>>>>> will
>>>>>>> raise the PR accordingly.
>>>>>>> 
>>>>>>> - This operator is for the JIRA ticket raised here
>>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <https://issues.apache.org/jira/browse/APEXMALHAR-2260> <
>>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <https://issues.apache.org/jira/browse/APEXMALHAR-2260>>
>>>>>>> - The operator embeds a python interpreter in the operator JVM
>>> process
>>>>>>> space and is not external to the JVM.
>>>>>>> - The implementation is proposing the use of Java Embedded Python (
>>>> JEP
>>>>> )
>>>>>>> given here https://github.com/ninia/jep <https://github.com/ninia/jep> <
>>> https://github.com/ninia/jep <https://github.com/ninia/jep>
>>>>> 
>>>>>>> - The JEP engine is under zlib/libpng license. Since this is an
>>>> approved
>>>>>>> license under https://www.apache.org/legal/resolved.html#category-a
>>> <
>>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am
>>> assuming
>>>> it
>>>>>>> is ok for the community to approve the inclusion of this library
>>>>>>> - Python integration is a messy piece due to the nature of dynamic
>>>>>>> libraries. All python libraries need to be natively installed. This
>>>> also
>>>>>>> means we will not be able bundle python libraries and dependencies
>>> as
>>>>> part
>>>>>>> of the build into the target JVM container. Hence this operator has
>>>> the
>>>>>>> current limitation of the python binaries installed through an
>>>> external
>>>>>>> process on all of the YARN nodes for now.
>>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around
>>> the
>>>>>>> dynamic library that is installed externally to the Apex
>>> installation
>>>>>>> process on all of the YARN nodes.
>>>>>>> - Hope to take up https://issues.apache.org/
>>> jira/browse/APEXCORE-796
>>>> <
>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
>>>> issue
>>>>>>> in the future.
>>>>>>> - The python operator implementation can be extended to py4J based
>>>>>>> implementation ( as opposed to in-memory model like JEP ) in the
>>>> future
>>>>> if
>>>>>>> required be. JEP is the implementation based on an in-memory design
>>>>> pattern.
>>>>>>> - The python operator allows for 4 major API patterns
>>>>>>>   - Execute a method call by accepting parameters to pass to the
>>>>>>> interpreter
>>>>>>>   - Execute a python script as given in a file path
>>>>>>>   - Evaluate an expression and allows for passing of variables
>>>> between
>>>>>>> the java code and the python in-memory interpreter bridge
>>>>>>>   - A handy method wherein a series of instructions can be passed
>>> in
>>>>> one
>>>>>>> single java call ( executed as a sequence of python eval
>>> instructions
>>>>> under
>>>>>>> the hood )
>>>>>>> - Automatic garbage collection of the variables that are passed from
>>>>> java
>>>>>>> code to the in memory python interpreter
>>>>>>> - Support for all major python libraries. Tensorflow, Keras, Scikit,
>>>>>>> xgboost. Preliminary tests for these libraries seem to work as per
>>>> code
>>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
>>>>>>> apex/apexjvmpython <https://github.com/ananthc/
>>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython>
>>>>>>> - The implementation allows for SLA based execution model. i.e. the
>>>>>>> operator is given a chance to execute the python code and if not
>>>>> complete
>>>>>>> within a time out, the operator code returns back null.
>>>>>>> - A tuple that has become a straggler as per previous point will
>>>>>>> automatically be drained off to a different port so that downstream
>>>>>>> operators can still consume the straggler if they want to when the
>>>>> results
>>>>>>> arrive.
>>>>>>> - Because of the nature of python being an interpreter and if a
>>>> previous
>>>>>>> tuple is being still processed, there is chance of a back pressure
>>>>> pattern
>>>>>>> building up very quickly. Hence this operator works on the concept
>>> of
>>>> a
>>>>>>> worker pool. The Python operator uses a configurable number of
>>> worker
>>>>>>> thread each of which embed the Python interpreter within their
>>>>> processing
>>>>>>> space. i.e. it is in fact a collection of python ink memory
>>>> interpreters
>>>>>>> inside the Python operator implementation.
>>>>>>> - The operator chooses one of the threads at runtime basing on their
>>>>> busy
>>>>>>> state thus allowing for back-pressure issues to be resolved
>>>>> automatically.
>>>>>>> - There is a first class support for Numpy in JEP. Java arrays would
>>>> be
>>>>>>> convertible to the Python Numpy arrays and vice versa and share the
>>>> same
>>>>>>> memory addresses for efficiency reasons.
>>>>>>> - The base operator implements dynamic partitioning based on a
>>> thread
>>>>>>> starvation policy. At each checkpoint, it checks how much percentage
>>>> of
>>>>> the
>>>>>>> requests resulted in starved threads and if the starvation exceeds a
>>>>>>> configured percentage, a new instance of the operator is provisioned
>>>> for
>>>>>>> every such instance of the operator
>>>>>>> - The operator provides the notion of a worker execution mode. There
>>>> are
>>>>>>> two worker modes that are passed in each of the above calls from the
>>>>> user.
>>>>>>> ALL or ANY.  Because python interpreter is state based engine, a
>>> newly
>>>>>>> dynamically partitioned operator might not be in the exact state of
>>>> the
>>>>>>> remaining operators. Hence the operator has this notion of worker
>>>>> execution
>>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with
>>> ALL
>>>>>>> execution mode will be executed on all the workers of the worker
>>>> thread
>>>>>>> pool as well as the dynamically portioned instance whenever such an
>>>>>>> instance is provisioned.
>>>>>>> - The base operator implementation has a method that can be
>>> overridden
>>>>> to
>>>>>>> implement the logic that needs to be executed for each tuple. The
>>> base
>>>>>>> operator default implementation is a simple NO-OP.
>>>>>>> - The operator automatically picks up the least busy of the thread
>>>> pool
>>>>>>> worker which has JEP embedded in it to execute the call.
>>>>>>> - The JEP based installation will not support non Cpython modules.
>>> All
>>>>> of
>>>>>>> the major python libraries are cpython based and hence I believe
>>> this
>>>>> is of
>>>>>>> a lesser concern. If we hit a roadblock when a new python library
>>>> being
>>>>> a
>>>>>>> non-Cpython based library needs to be run, then we could implement
>>> the
>>>>>>> ApexPythonEngine interface to something like Py4J which involves
>>>>>>> interprocess communication.
>>>>>>> - The python operator requires the user to set the library path
>>>>>>> java.library.path for the operator to make use of the dynamic
>>>> libraries
>>>>> of
>>>>>>> the corresponding platform. This has to be passed in as the JVM
>>>> options.
>>>>>>> Failing to do so will result in the operator failing to load the
>>>>>>> interpreter properly.
>>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
>>>> Numpy
>>>>>> =
>>>>>>> 1.7 is supported.
>>>>>>> - There is no support for virtual environments yet. In case of
>>>> multiple
>>>>>>> python versions on the node, to include the right python version for
>>>> the
>>>>>>> apex operator, ensure that the environment variables and the dynamic
>>>>>>> library path are set appropriately. This is a workaround and I hope
>>>>>>> APEXCORE-796 will solve this issue as well.
>>>>>>> 
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Ananth
> 


Re: [Discuss] Design of the python execution operator

Posted by Ananth G <an...@gmail.com>.
Thanks for the comments Thomas and Pramod. 

Apologies for the delayed response on this thread. 

> I believe the thread implementation still adds some value over a container local approach. It is more of a “thread local” equivalent which is more efficient as opposed to a container local implementation. Also the number of worker threads is configurable. Setting the value of 1 will let the user to not do this ( although I do not see a reason for why not ). There is always the over head of serialise/de-serialize cycle even for a container local approach and there is the additional possibility of container local not being honoured by the Resource manager based on the state of the resources. 

> Regarding the configurable key to ensure all tuples in a window are processed, I am adding a switch which can let the user choose ( and javadoc that clearly points out issues if not waiting for the tuples to be completely processed ). There are pros and cons for this and letting the user decide might be a better approach. The reason why I mention cons for waiting the tuples to complete ( apart from the reason that Thomas mentioned ) is that if one of the commands that the user wrote is an erroneous one, all the subsequent calls to that interpreter thread cal fail. An example use case is that tuple A set some value for variable x and tuple B that is coming next is making use of the variable x. Syntactically expression for tuple B is valid but just that it depends on variable x. Now if the variable x is not in memory because tuple A is a straggler resulting in tuple B resulting in an erroneous interpreter state. Hence the operator might stall definitely as end window will be stalled forever resulting in killing of the operator ultimately. This is also because the erroneous command corrupted the state of the interpreter itself. Of course this can happen to all of the threads in the interpreter worker pool resulting in this state as well. Perhaps an improvement of the current implementation is to detect all such stalled interpreters for more than x windows and rebuild the interpreter thread when such a situation is detected. 

> Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the stragglers are drained out irrespective of a new tuple coming in for processing. In the previous iteration, the stragglers could only be drained when there is a new tuple that came in processing as delayed responses queue could only be checked when there is some activity on the main thread. 

> Thanks for raising the point about the virtual environments: This is a point I missed mentioning in the design description below. There is no support for virtual environments yet in JEP and hence the current limitation. However the work around is simple. As part of the application configuration, we need to provide the JAVA_LIBRARY_PATH which contains the path to the JEP dynamic libraries. If there are multiple python installs ( and hence multiple JEP libraries to choose from for each of the apex applications that are being deployed), setting the right path for the operator JVM will result in picking the corresponding python interpreter version. This also essentially means that we cannot have a thread local deployment configuration of two python operators that belong to different python versions in the same JVM.  The Docker approach ticket should be the right fix for virtual environments issue? https://issues.apache.org/jira/browse/APEXCORE-796 <https://issues.apache.org/jira/browse/APEXCORE-796> ( but still might not solve the thread local configuration deployment )

Regards,
Ananth


> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pr...@datatorrent.com> wrote:
> 
> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <thw@apache.org <ma...@apache.org>> wrote:
> 
>> It is exciting to see this move forward, the ability to use Python opens
>> many new possibilities.
>> 
>> Regarding use of worker threads, this is a pattern that we are using
>> elsewhere (for example in the Kafka input operator). When the operator
>> performs blocking operations and consumes little memory and/or CPU, then it
>> is more economic to first use threads to increase parallelism and
>> throughput (up to a limit), and then the more expensive containers for
>> horizontal scaling (multiple threads to make good use of container
>> resources and then scale using the usual partitioning).
>> 
> 
> I think there is a difference. In case of kafka or other input operators
> the threads are less constrained. They can operate with independence and
> can dictate the pace limited only by back pressure. In this case the
> operator is most likely going to be downsteram in the DAG and would have
> constraints for processing guarantees. For scalability, container local
> could also be used as a substitue for multiple threads without resorting to
> using separate containers. I can understand use of a separate thread to be
> able to get around problems like stalled processing but would first try to
> see if something like container local would work for scaling.
> 
> 
>> It is also correct that generally there is no ordering guarantee within a
>> streaming window, and that would be the case when multiple input ports are
>> present as well. (The platform cannot guarantee such ordering, this would
>> need to be done by the operator).
> 
> 
> 
>> Idempotency can be expensive (latency and/or space complexity), and not all
>> applications need it (like certain use cases that process record by record
>> and don't accumulate state). An example might be Python logic that is used
>> for scoring against a model that was built offline. Idempotency would
>> actually be rather difficult to implement, since the operator would need to
>> remember which tuples were emitted in a given interval and on replay block
>> until they are available (and also hold others that may be processed sooner
>> than in the original order). It may be easier to record emitted tuples to a
>> WAL instead of reprocessing.
>> 
> 
> Ordering cannot be guaranteed but the operator would need to finish the
> work it is given a window within the window boundary, otherwise there is a
> chance for data loss in recovery scenarios. You could make checkpoint the
> boundary by which all pending work is completed instead of every window
> boundary but then downstream operators cannot rely on window level
> idempotency for exactly once. Something like file output operator would
> work but not our db kind of operator. Both options could be supported in
> the operator.
> 
> 
>> Regarding not emitting stragglers until the next input arrives, can this
>> not be accomplished using IdleTimeHandler?
>> 
>> What is preventing the use of virtual environments?
>> 
>> Thanks,
>> Thomas
>> 
>> 
>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pr...@datatorrent.com>
>> wrote:
>> 
>>> Hi Ananth,
>>> 
>>> From your explanation, it looks like the threads overall allow you to
>>> achieve two things. Have some sort of overall timeout if by which a tuple
>>> doesn't finish processing then it is flagged as such. Second, it doesn't
>>> block processing of subsequent tuples and you can still process them
>>> meeting the SLA. By checkpoint, however, I think you should try to have a
>>> resolution one way or the other for all the tuples received within the
>>> checkpoint period or every window boundary (see idempotency below),
>>> otherwise, there is a chance of data loss in case of operator restarts.
>> If
>>> a loss is acceptable for stragglers you could let straggler processing
>>> continue beyond checkpoint boundary and let them finish when they can.
>> You
>>> could support both behaviors by use of a property. Furthermore, you may
>> not
>>> want all threads stuck with stragglers and then you are back to square
>> one
>>> so you may need to stop processing stragglers beyond a certain thread
>> usage
>>> threshold. Is there a way to interrupt the processing of the engine?
>>> 
>>> Then there is the question of idempotency. I suspect it would be
>> difficult
>>> to maintain it unless you wait for processing to finish for all tuples
>>> received during the window every window boundary. You may provide an
>> option
>>> for relaxing the strict guarantees for the stragglers like mentioned
>> above.
>>> 
>>> Pramod
>>> 
>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <an...@gmail.com>
>> wrote:
>>> 
>>>> Hello Pramod,
>>>> 
>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is
>> what I
>>>> was thinking for the worker pool implementation.
>>>> 
>>>> - The main reason ( which I forgot to mention in the design points
>> below
>>> )
>>>> is that the java embedded engine allows only the thread that created
>> the
>>>> instance to execute the python logic. This is more because of the JNI
>>>> specification itself. Some hints here https://stackoverflow.com/
>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
>>>> https://stackoverflow.com/questions/18056347/jni-
>>> calling-java-from-c-with-
>>>> multiple-threads> and here http://journals.ecs.soton.ac.
>>>> uk/java/tutorial/native1.1/implementing/sync.html <
>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
>>>> implementing/sync.html>
>>>> 
>>>> - This essentially means that the main operator thread will have to
>> call
>>>> the python code execution logic if the design were otherwise.
>>>> 
>>>> - Since the end user can choose to can write any kind of logic
>> including
>>>> blocking I/O as part of the implementation, I did not want to stall the
>>>> operator thread for any usage pattern.
>>>> 
>>>> - In fact there is only one overall interpreter in the JVM process
>> space
>>>> and the interpreter thread is just a JNI wrapper around it to account
>> for
>>>> the JNI limitations above.
>>>> 
>>>> - It is for the very same reason, there is an API in the implementation
>>> to
>>>> support for registering Shared Modules across all of the interpreter
>>>> threads. Use cases for this exist when there is a global variable
>>> provided
>>>> by the underlying Python library and loading it multiple times can
>> cause
>>>> issues. Hence the API to register a shared module which can be used by
>>> all
>>>> of the Interpreter Threads.
>>>> 
>>>> - The operator submits to a work request queue and consumes from a
>>>> response queue for each of the interpreter thread. There exists one
>>> request
>>>> and one response queue per interpreter thread.
>>>> 
>>>> - The stragglers will get drained from the response queue for a
>>> previously
>>>> submitted request queue.
>>>> 
>>>> - The other reason why I chose to implement it this ways is also for
>> some
>>>> of the use case that I foresee in the ML scoring scenarios. In fraud
>>>> systems, if I have a strict SLA to score a model, the main thread in
>> the
>>>> operator is not helping me implement this pattern at all. The caller to
>>> the
>>>> Apex application will need to proceed if the scoring gets delayed due
>> to
>>>> whatever reason. However the scoring can continue on the interpreter
>>> thread
>>>> and can be drained later ( It is just that the caller did not make use
>> of
>>>> this result but can still be persisted for operators consuming from the
>>>> straggler port.
>>>> 
>>>> - There are 3 output ports for this operator. DefaultOutputPort,
>>>> stragglersPort and an errorPort.
>>>> 
>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow
>>>> models can execute a tensorflow DAG as part of a model scoring
>>>> implementation and hence I wanted to take the approach of a worker
>> pool.
>>>> Yes your point is valid if we wait for the stragglers to complete in a
>>>> given window. The current implementation does not force to wait for all
>>> of
>>>> the stragglers to complete. The stragglers are emitted only when there
>>> is a
>>>> new tuple that is being processed. i.e. when a new tuple arrives for
>>>> scoring , the straggler response queue is checked if there are any
>>> entries
>>>> and if yes, the responses are emitted into the stragglerPort. This
>>>> essentially means that there are situations when the straggler port is
>>>> emitting the result for a request submitted in the previous window.
>> This
>>>> also implies that idempotency cannot be guaranteed across runs of the
>>> same
>>>> input data. In fact all threaded implementations have this issue as
>>>> ordering of the results is not guaranteed to be unique even within a
>>> given
>>>> window ?
>>>> 
>>>> I can enforce a block/drain at the end of the window to force a
>>> completion
>>>> basing on the feedback.
>>>> 
>>>> 
>>>> Regards,
>>>> Ananth
>>>> 
>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pr...@datatorrent.com>
>>>> wrote:
>>>>> 
>>>>> Hi Anath,
>>>>> 
>>>>> Sounds interesting and looks like you have put quite a bit of work on
>>> it.
>>>>> Might I suggest changing the title of 2260 to better fit your
>> proposal
>>>> and
>>>>> implementation, mainly so that there is differentiation from 2261.
>>>>> 
>>>>> I wanted to discuss the proposal to use multiple threads in an
>> operator
>>>>> instance. Unless the execution threads are blocking for some sort of
>>> i/o
>>>>> why would it result in a noticeable performance difference compared
>> to
>>>>> processing in operator thread and running multiple partitions of the
>>>>> operator in container local. By running the processing in a separate
>>>> thread
>>>>> from the operator lifecycle thread you don't still get away from
>>> matching
>>>>> the incoming data throughput. The checkpoint will act as a time where
>>> you
>>>>> backpressure will start to materialize when the operator would have
>> to
>>>> wait
>>>>> for your background processing to complete to guarantee all data till
>>> the
>>>>> checkpoint is processed.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> 
>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> Hello All,
>>>>>> 
>>>>>> I would like to submit the design for the Python execution operator
>>>> before
>>>>>> I raise the pull request so that I can refine the implementation
>> based
>>>> on
>>>>>> feedback. Could you please provide feedback on the design if any
>> and I
>>>> will
>>>>>> raise the PR accordingly.
>>>>>> 
>>>>>> - This operator is for the JIRA ticket raised here
>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
>>>>>> - The operator embeds a python interpreter in the operator JVM
>> process
>>>>>> space and is not external to the JVM.
>>>>>> - The implementation is proposing the use of Java Embedded Python (
>>> JEP
>>>> )
>>>>>> given here https://github.com/ninia/jep <
>> https://github.com/ninia/jep
>>>> 
>>>>>> - The JEP engine is under zlib/libpng license. Since this is an
>>> approved
>>>>>> license under https://www.apache.org/legal/resolved.html#category-a
>> <
>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am
>> assuming
>>> it
>>>>>> is ok for the community to approve the inclusion of this library
>>>>>> - Python integration is a messy piece due to the nature of dynamic
>>>>>> libraries. All python libraries need to be natively installed. This
>>> also
>>>>>> means we will not be able bundle python libraries and dependencies
>> as
>>>> part
>>>>>> of the build into the target JVM container. Hence this operator has
>>> the
>>>>>> current limitation of the python binaries installed through an
>>> external
>>>>>> process on all of the YARN nodes for now.
>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around
>> the
>>>>>> dynamic library that is installed externally to the Apex
>> installation
>>>>>> process on all of the YARN nodes.
>>>>>> - Hope to take up https://issues.apache.org/
>> jira/browse/APEXCORE-796
>>> <
>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
>>> issue
>>>>>> in the future.
>>>>>> - The python operator implementation can be extended to py4J based
>>>>>> implementation ( as opposed to in-memory model like JEP ) in the
>>> future
>>>> if
>>>>>> required be. JEP is the implementation based on an in-memory design
>>>> pattern.
>>>>>> - The python operator allows for 4 major API patterns
>>>>>>   - Execute a method call by accepting parameters to pass to the
>>>>>> interpreter
>>>>>>   - Execute a python script as given in a file path
>>>>>>   - Evaluate an expression and allows for passing of variables
>>> between
>>>>>> the java code and the python in-memory interpreter bridge
>>>>>>   - A handy method wherein a series of instructions can be passed
>> in
>>>> one
>>>>>> single java call ( executed as a sequence of python eval
>> instructions
>>>> under
>>>>>> the hood )
>>>>>> - Automatic garbage collection of the variables that are passed from
>>>> java
>>>>>> code to the in memory python interpreter
>>>>>> - Support for all major python libraries. Tensorflow, Keras, Scikit,
>>>>>> xgboost. Preliminary tests for these libraries seem to work as per
>>> code
>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
>>>>>> apex/apexjvmpython <https://github.com/ananthc/
>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython>
>>>>>> - The implementation allows for SLA based execution model. i.e. the
>>>>>> operator is given a chance to execute the python code and if not
>>>> complete
>>>>>> within a time out, the operator code returns back null.
>>>>>> - A tuple that has become a straggler as per previous point will
>>>>>> automatically be drained off to a different port so that downstream
>>>>>> operators can still consume the straggler if they want to when the
>>>> results
>>>>>> arrive.
>>>>>> - Because of the nature of python being an interpreter and if a
>>> previous
>>>>>> tuple is being still processed, there is chance of a back pressure
>>>> pattern
>>>>>> building up very quickly. Hence this operator works on the concept
>> of
>>> a
>>>>>> worker pool. The Python operator uses a configurable number of
>> worker
>>>>>> thread each of which embed the Python interpreter within their
>>>> processing
>>>>>> space. i.e. it is in fact a collection of python ink memory
>>> interpreters
>>>>>> inside the Python operator implementation.
>>>>>> - The operator chooses one of the threads at runtime basing on their
>>>> busy
>>>>>> state thus allowing for back-pressure issues to be resolved
>>>> automatically.
>>>>>> - There is a first class support for Numpy in JEP. Java arrays would
>>> be
>>>>>> convertible to the Python Numpy arrays and vice versa and share the
>>> same
>>>>>> memory addresses for efficiency reasons.
>>>>>> - The base operator implements dynamic partitioning based on a
>> thread
>>>>>> starvation policy. At each checkpoint, it checks how much percentage
>>> of
>>>> the
>>>>>> requests resulted in starved threads and if the starvation exceeds a
>>>>>> configured percentage, a new instance of the operator is provisioned
>>> for
>>>>>> every such instance of the operator
>>>>>> - The operator provides the notion of a worker execution mode. There
>>> are
>>>>>> two worker modes that are passed in each of the above calls from the
>>>> user.
>>>>>> ALL or ANY.  Because python interpreter is state based engine, a
>> newly
>>>>>> dynamically partitioned operator might not be in the exact state of
>>> the
>>>>>> remaining operators. Hence the operator has this notion of worker
>>>> execution
>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with
>> ALL
>>>>>> execution mode will be executed on all the workers of the worker
>>> thread
>>>>>> pool as well as the dynamically portioned instance whenever such an
>>>>>> instance is provisioned.
>>>>>> - The base operator implementation has a method that can be
>> overridden
>>>> to
>>>>>> implement the logic that needs to be executed for each tuple. The
>> base
>>>>>> operator default implementation is a simple NO-OP.
>>>>>> - The operator automatically picks up the least busy of the thread
>>> pool
>>>>>> worker which has JEP embedded in it to execute the call.
>>>>>> - The JEP based installation will not support non Cpython modules.
>> All
>>>> of
>>>>>> the major python libraries are cpython based and hence I believe
>> this
>>>> is of
>>>>>> a lesser concern. If we hit a roadblock when a new python library
>>> being
>>>> a
>>>>>> non-Cpython based library needs to be run, then we could implement
>> the
>>>>>> ApexPythonEngine interface to something like Py4J which involves
>>>>>> interprocess communication.
>>>>>> - The python operator requires the user to set the library path
>>>>>> java.library.path for the operator to make use of the dynamic
>>> libraries
>>>> of
>>>>>> the corresponding platform. This has to be passed in as the JVM
>>> options.
>>>>>> Failing to do so will result in the operator failing to load the
>>>>>> interpreter properly.
>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
>>> Numpy
>>>>> =
>>>>>> 1.7 is supported.
>>>>>> - There is no support for virtual environments yet. In case of
>>> multiple
>>>>>> python versions on the node, to include the right python version for
>>> the
>>>>>> apex operator, ensure that the environment variables and the dynamic
>>>>>> library path are set appropriately. This is a workaround and I hope
>>>>>> APEXCORE-796 will solve this issue as well.
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Ananth


Re: [Discuss] Design of the python execution operator

Posted by Pramod Immaneni <pr...@datatorrent.com>.
On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <th...@apache.org> wrote:

> It is exciting to see this move forward, the ability to use Python opens
> many new possibilities.
>
> Regarding use of worker threads, this is a pattern that we are using
> elsewhere (for example in the Kafka input operator). When the operator
> performs blocking operations and consumes little memory and/or CPU, then it
> is more economic to first use threads to increase parallelism and
> throughput (up to a limit), and then the more expensive containers for
> horizontal scaling (multiple threads to make good use of container
> resources and then scale using the usual partitioning).
>

I think there is a difference. In case of kafka or other input operators
the threads are less constrained. They can operate with independence and
can dictate the pace limited only by back pressure. In this case the
operator is most likely going to be downsteram in the DAG and would have
constraints for processing guarantees. For scalability, container local
could also be used as a substitue for multiple threads without resorting to
using separate containers. I can understand use of a separate thread to be
able to get around problems like stalled processing but would first try to
see if something like container local would work for scaling.


> It is also correct that generally there is no ordering guarantee within a
> streaming window, and that would be the case when multiple input ports are
> present as well. (The platform cannot guarantee such ordering, this would
> need to be done by the operator).



> Idempotency can be expensive (latency and/or space complexity), and not all
> applications need it (like certain use cases that process record by record
> and don't accumulate state). An example might be Python logic that is used
> for scoring against a model that was built offline. Idempotency would
> actually be rather difficult to implement, since the operator would need to
> remember which tuples were emitted in a given interval and on replay block
> until they are available (and also hold others that may be processed sooner
> than in the original order). It may be easier to record emitted tuples to a
> WAL instead of reprocessing.
>

Ordering cannot be guaranteed but the operator would need to finish the
work it is given a window within the window boundary, otherwise there is a
chance for data loss in recovery scenarios. You could make checkpoint the
boundary by which all pending work is completed instead of every window
boundary but then downstream operators cannot rely on window level
idempotency for exactly once. Something like file output operator would
work but not our db kind of operator. Both options could be supported in
the operator.


> Regarding not emitting stragglers until the next input arrives, can this
> not be accomplished using IdleTimeHandler?
>
> What is preventing the use of virtual environments?
>
> Thanks,
> Thomas
>
>
> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > Hi Ananth,
> >
> > From your explanation, it looks like the threads overall allow you to
> > achieve two things. Have some sort of overall timeout if by which a tuple
> > doesn't finish processing then it is flagged as such. Second, it doesn't
> > block processing of subsequent tuples and you can still process them
> > meeting the SLA. By checkpoint, however, I think you should try to have a
> > resolution one way or the other for all the tuples received within the
> > checkpoint period or every window boundary (see idempotency below),
> > otherwise, there is a chance of data loss in case of operator restarts.
> If
> > a loss is acceptable for stragglers you could let straggler processing
> > continue beyond checkpoint boundary and let them finish when they can.
> You
> > could support both behaviors by use of a property. Furthermore, you may
> not
> > want all threads stuck with stragglers and then you are back to square
> one
> > so you may need to stop processing stragglers beyond a certain thread
> usage
> > threshold. Is there a way to interrupt the processing of the engine?
> >
> > Then there is the question of idempotency. I suspect it would be
> difficult
> > to maintain it unless you wait for processing to finish for all tuples
> > received during the window every window boundary. You may provide an
> option
> > for relaxing the strict guarantees for the stragglers like mentioned
> above.
> >
> > Pramod
> >
> > On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <an...@gmail.com>
> wrote:
> >
> > > Hello Pramod,
> > >
> > > Thanks for the comments. I adjusted the title of the JIRA. Here is
> what I
> > > was thinking for the worker pool implementation.
> > >
> > > - The main reason ( which I forgot to mention in the design points
> below
> > )
> > > is that the java embedded engine allows only the thread that created
> the
> > > instance to execute the python logic. This is more because of the JNI
> > > specification itself. Some hints here https://stackoverflow.com/
> > > questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> > > https://stackoverflow.com/questions/18056347/jni-
> > calling-java-from-c-with-
> > > multiple-threads> and here http://journals.ecs.soton.ac.
> > > uk/java/tutorial/native1.1/implementing/sync.html <
> > > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> > > implementing/sync.html>
> > >
> > > - This essentially means that the main operator thread will have to
> call
> > > the python code execution logic if the design were otherwise.
> > >
> > > - Since the end user can choose to can write any kind of logic
> including
> > > blocking I/O as part of the implementation, I did not want to stall the
> > > operator thread for any usage pattern.
> > >
> > > - In fact there is only one overall interpreter in the JVM process
> space
> > > and the interpreter thread is just a JNI wrapper around it to account
> for
> > > the JNI limitations above.
> > >
> > > - It is for the very same reason, there is an API in the implementation
> > to
> > > support for registering Shared Modules across all of the interpreter
> > > threads. Use cases for this exist when there is a global variable
> > provided
> > > by the underlying Python library and loading it multiple times can
> cause
> > > issues. Hence the API to register a shared module which can be used by
> > all
> > > of the Interpreter Threads.
> > >
> > > - The operator submits to a work request queue and consumes from a
> > > response queue for each of the interpreter thread. There exists one
> > request
> > > and one response queue per interpreter thread.
> > >
> > > - The stragglers will get drained from the response queue for a
> > previously
> > > submitted request queue.
> > >
> > > - The other reason why I chose to implement it this ways is also for
> some
> > > of the use case that I foresee in the ML scoring scenarios. In fraud
> > > systems, if I have a strict SLA to score a model, the main thread in
> the
> > > operator is not helping me implement this pattern at all. The caller to
> > the
> > > Apex application will need to proceed if the scoring gets delayed due
> to
> > > whatever reason. However the scoring can continue on the interpreter
> > thread
> > > and can be drained later ( It is just that the caller did not make use
> of
> > > this result but can still be persisted for operators consuming from the
> > > straggler port.
> > >
> > > - There are 3 output ports for this operator. DefaultOutputPort,
> > > stragglersPort and an errorPort.
> > >
> > > - Some libraries like Tensorflow can become really heavy. Tensorflow
> > > models can execute a tensorflow DAG as part of a model scoring
> > > implementation and hence I wanted to take the approach of a worker
> pool.
> > > Yes your point is valid if we wait for the stragglers to complete in a
> > > given window. The current implementation does not force to wait for all
> > of
> > > the stragglers to complete. The stragglers are emitted only when there
> > is a
> > > new tuple that is being processed. i.e. when a new tuple arrives for
> > > scoring , the straggler response queue is checked if there are any
> > entries
> > > and if yes, the responses are emitted into the stragglerPort. This
> > > essentially means that there are situations when the straggler port is
> > > emitting the result for a request submitted in the previous window.
> This
> > > also implies that idempotency cannot be guaranteed across runs of the
> > same
> > > input data. In fact all threaded implementations have this issue as
> > > ordering of the results is not guaranteed to be unique even within a
> > given
> > > window ?
> > >
> > > I can enforce a block/drain at the end of the window to force a
> > completion
> > > basing on the feedback.
> > >
> > >
> > > Regards,
> > > Ananth
> > >
> > > > On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pr...@datatorrent.com>
> > > wrote:
> > > >
> > > > Hi Anath,
> > > >
> > > > Sounds interesting and looks like you have put quite a bit of work on
> > it.
> > > > Might I suggest changing the title of 2260 to better fit your
> proposal
> > > and
> > > > implementation, mainly so that there is differentiation from 2261.
> > > >
> > > > I wanted to discuss the proposal to use multiple threads in an
> operator
> > > > instance. Unless the execution threads are blocking for some sort of
> > i/o
> > > > why would it result in a noticeable performance difference compared
> to
> > > > processing in operator thread and running multiple partitions of the
> > > > operator in container local. By running the processing in a separate
> > > thread
> > > > from the operator lifecycle thread you don't still get away from
> > matching
> > > > the incoming data throughput. The checkpoint will act as a time where
> > you
> > > > backpressure will start to materialize when the operator would have
> to
> > > wait
> > > > for your background processing to complete to guarantee all data till
> > the
> > > > checkpoint is processed.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello All,
> > > >>
> > > >> I would like to submit the design for the Python execution operator
> > > before
> > > >> I raise the pull request so that I can refine the implementation
> based
> > > on
> > > >> feedback. Could you please provide feedback on the design if any
> and I
> > > will
> > > >> raise the PR accordingly.
> > > >>
> > > >> - This operator is for the JIRA ticket raised here
> > > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> > > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> > > >> - The operator embeds a python interpreter in the operator JVM
> process
> > > >> space and is not external to the JVM.
> > > >> - The implementation is proposing the use of Java Embedded Python (
> > JEP
> > > )
> > > >> given here https://github.com/ninia/jep <
> https://github.com/ninia/jep
> > >
> > > >> - The JEP engine is under zlib/libpng license. Since this is an
> > approved
> > > >> license under https://www.apache.org/legal/resolved.html#category-a
> <
> > > >> https://www.apache.org/legal/resolved.html#category-a> I am
> assuming
> > it
> > > >> is ok for the community to approve the inclusion of this library
> > > >> - Python integration is a messy piece due to the nature of dynamic
> > > >> libraries. All python libraries need to be natively installed. This
> > also
> > > >> means we will not be able bundle python libraries and dependencies
> as
> > > part
> > > >> of the build into the target JVM container. Hence this operator has
> > the
> > > >> current limitation of the python binaries installed through an
> > external
> > > >> process on all of the YARN nodes for now.
> > > >> - The JEP maven dependency jar in the POM is a JNI wrapper around
> the
> > > >> dynamic library that is installed externally to the Apex
> installation
> > > >> process on all of the YARN nodes.
> > > >> - Hope to take up https://issues.apache.org/
> jira/browse/APEXCORE-796
> > <
> > > >> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
> > issue
> > > >> in the future.
> > > >> - The python operator implementation can be extended to py4J based
> > > >> implementation ( as opposed to in-memory model like JEP ) in the
> > future
> > > if
> > > >> required be. JEP is the implementation based on an in-memory design
> > > pattern.
> > > >> - The python operator allows for 4 major API patterns
> > > >>    - Execute a method call by accepting parameters to pass to the
> > > >> interpreter
> > > >>    - Execute a python script as given in a file path
> > > >>    - Evaluate an expression and allows for passing of variables
> > between
> > > >> the java code and the python in-memory interpreter bridge
> > > >>    - A handy method wherein a series of instructions can be passed
> in
> > > one
> > > >> single java call ( executed as a sequence of python eval
> instructions
> > > under
> > > >> the hood )
> > > >> - Automatic garbage collection of the variables that are passed from
> > > java
> > > >> code to the in memory python interpreter
> > > >> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> > > >> xgboost. Preliminary tests for these libraries seem to work as per
> > code
> > > >> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> > > >> apex/apexjvmpython <https://github.com/ananthc/
> > > >> sampleapps/tree/master/apache-apex/apexjvmpython>
> > > >> - The implementation allows for SLA based execution model. i.e. the
> > > >> operator is given a chance to execute the python code and if not
> > > complete
> > > >> within a time out, the operator code returns back null.
> > > >> - A tuple that has become a straggler as per previous point will
> > > >> automatically be drained off to a different port so that downstream
> > > >> operators can still consume the straggler if they want to when the
> > > results
> > > >> arrive.
> > > >> - Because of the nature of python being an interpreter and if a
> > previous
> > > >> tuple is being still processed, there is chance of a back pressure
> > > pattern
> > > >> building up very quickly. Hence this operator works on the concept
> of
> > a
> > > >> worker pool. The Python operator uses a configurable number of
> worker
> > > >> thread each of which embed the Python interpreter within their
> > > processing
> > > >> space. i.e. it is in fact a collection of python ink memory
> > interpreters
> > > >> inside the Python operator implementation.
> > > >> - The operator chooses one of the threads at runtime basing on their
> > > busy
> > > >> state thus allowing for back-pressure issues to be resolved
> > > automatically.
> > > >> - There is a first class support for Numpy in JEP. Java arrays would
> > be
> > > >> convertible to the Python Numpy arrays and vice versa and share the
> > same
> > > >> memory addresses for efficiency reasons.
> > > >> - The base operator implements dynamic partitioning based on a
> thread
> > > >> starvation policy. At each checkpoint, it checks how much percentage
> > of
> > > the
> > > >> requests resulted in starved threads and if the starvation exceeds a
> > > >> configured percentage, a new instance of the operator is provisioned
> > for
> > > >> every such instance of the operator
> > > >> - The operator provides the notion of a worker execution mode. There
> > are
> > > >> two worker modes that are passed in each of the above calls from the
> > > user.
> > > >> ALL or ANY.  Because python interpreter is state based engine, a
> newly
> > > >> dynamically partitioned operator might not be in the exact state of
> > the
> > > >> remaining operators. Hence the operator has this notion of worker
> > > execution
> > > >> mode. Any call ( any of the 4 calls mentioned above ) called with
> ALL
> > > >> execution mode will be executed on all the workers of the worker
> > thread
> > > >> pool as well as the dynamically portioned instance whenever such an
> > > >> instance is provisioned.
> > > >> - The base operator implementation has a method that can be
> overridden
> > > to
> > > >> implement the logic that needs to be executed for each tuple. The
> base
> > > >> operator default implementation is a simple NO-OP.
> > > >> - The operator automatically picks up the least busy of the thread
> > pool
> > > >> worker which has JEP embedded in it to execute the call.
> > > >> - The JEP based installation will not support non Cpython modules.
> All
> > > of
> > > >> the major python libraries are cpython based and hence I believe
> this
> > > is of
> > > >> a lesser concern. If we hit a roadblock when a new python library
> > being
> > > a
> > > >> non-Cpython based library needs to be run, then we could implement
> the
> > > >> ApexPythonEngine interface to something like Py4J which involves
> > > >> interprocess communication.
> > > >> - The python operator requires the user to set the library path
> > > >> java.library.path for the operator to make use of the dynamic
> > libraries
> > > of
> > > >> the corresponding platform. This has to be passed in as the JVM
> > options.
> > > >> Failing to do so will result in the operator failing to load the
> > > >> interpreter properly.
> > > >> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
> > Numpy
> > > >=
> > > >> 1.7 is supported.
> > > >> - There is no support for virtual environments yet. In case of
> > multiple
> > > >> python versions on the node, to include the right python version for
> > the
> > > >> apex operator, ensure that the environment variables and the dynamic
> > > >> library path are set appropriately. This is a workaround and I hope
> > > >> APEXCORE-796 will solve this issue as well.
> > > >>
> > > >>
> > > >> Regards,
> > > >> Ananth
> > > >>
> > > >>
> > >
> > >
> >
>

Re: [Discuss] Design of the python execution operator

Posted by Thomas Weise <th...@apache.org>.
It is exciting to see this move forward, the ability to use Python opens
many new possibilities.

Regarding use of worker threads, this is a pattern that we are using
elsewhere (for example in the Kafka input operator). When the operator
performs blocking operations and consumes little memory and/or CPU, then it
is more economic to first use threads to increase parallelism and
throughput (up to a limit), and then the more expensive containers for
horizontal scaling (multiple threads to make good use of container
resources and then scale using the usual partitioning).

It is also correct that generally there is no ordering guarantee within a
streaming window, and that would be the case when multiple input ports are
present as well. (The platform cannot guarantee such ordering, this would
need to be done by the operator).

Idempotency can be expensive (latency and/or space complexity), and not all
applications need it (like certain use cases that process record by record
and don't accumulate state). An example might be Python logic that is used
for scoring against a model that was built offline. Idempotency would
actually be rather difficult to implement, since the operator would need to
remember which tuples were emitted in a given interval and on replay block
until they are available (and also hold others that may be processed sooner
than in the original order). It may be easier to record emitted tuples to a
WAL instead of reprocessing.

Regarding not emitting stragglers until the next input arrives, can this
not be accomplished using IdleTimeHandler?

What is preventing the use of virtual environments?

Thanks,
Thomas


On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Hi Ananth,
>
> From your explanation, it looks like the threads overall allow you to
> achieve two things. Have some sort of overall timeout if by which a tuple
> doesn't finish processing then it is flagged as such. Second, it doesn't
> block processing of subsequent tuples and you can still process them
> meeting the SLA. By checkpoint, however, I think you should try to have a
> resolution one way or the other for all the tuples received within the
> checkpoint period or every window boundary (see idempotency below),
> otherwise, there is a chance of data loss in case of operator restarts. If
> a loss is acceptable for stragglers you could let straggler processing
> continue beyond checkpoint boundary and let them finish when they can. You
> could support both behaviors by use of a property. Furthermore, you may not
> want all threads stuck with stragglers and then you are back to square one
> so you may need to stop processing stragglers beyond a certain thread usage
> threshold. Is there a way to interrupt the processing of the engine?
>
> Then there is the question of idempotency. I suspect it would be difficult
> to maintain it unless you wait for processing to finish for all tuples
> received during the window every window boundary. You may provide an option
> for relaxing the strict guarantees for the stragglers like mentioned above.
>
> Pramod
>
> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <an...@gmail.com> wrote:
>
> > Hello Pramod,
> >
> > Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> > was thinking for the worker pool implementation.
> >
> > - The main reason ( which I forgot to mention in the design points below
> )
> > is that the java embedded engine allows only the thread that created the
> > instance to execute the python logic. This is more because of the JNI
> > specification itself. Some hints here https://stackoverflow.com/
> > questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> > https://stackoverflow.com/questions/18056347/jni-
> calling-java-from-c-with-
> > multiple-threads> and here http://journals.ecs.soton.ac.
> > uk/java/tutorial/native1.1/implementing/sync.html <
> > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> > implementing/sync.html>
> >
> > - This essentially means that the main operator thread will have to call
> > the python code execution logic if the design were otherwise.
> >
> > - Since the end user can choose to can write any kind of logic including
> > blocking I/O as part of the implementation, I did not want to stall the
> > operator thread for any usage pattern.
> >
> > - In fact there is only one overall interpreter in the JVM process space
> > and the interpreter thread is just a JNI wrapper around it to account for
> > the JNI limitations above.
> >
> > - It is for the very same reason, there is an API in the implementation
> to
> > support for registering Shared Modules across all of the interpreter
> > threads. Use cases for this exist when there is a global variable
> provided
> > by the underlying Python library and loading it multiple times can cause
> > issues. Hence the API to register a shared module which can be used by
> all
> > of the Interpreter Threads.
> >
> > - The operator submits to a work request queue and consumes from a
> > response queue for each of the interpreter thread. There exists one
> request
> > and one response queue per interpreter thread.
> >
> > - The stragglers will get drained from the response queue for a
> previously
> > submitted request queue.
> >
> > - The other reason why I chose to implement it this ways is also for some
> > of the use case that I foresee in the ML scoring scenarios. In fraud
> > systems, if I have a strict SLA to score a model, the main thread in the
> > operator is not helping me implement this pattern at all. The caller to
> the
> > Apex application will need to proceed if the scoring gets delayed due to
> > whatever reason. However the scoring can continue on the interpreter
> thread
> > and can be drained later ( It is just that the caller did not make use of
> > this result but can still be persisted for operators consuming from the
> > straggler port.
> >
> > - There are 3 output ports for this operator. DefaultOutputPort,
> > stragglersPort and an errorPort.
> >
> > - Some libraries like Tensorflow can become really heavy. Tensorflow
> > models can execute a tensorflow DAG as part of a model scoring
> > implementation and hence I wanted to take the approach of a worker pool.
> > Yes your point is valid if we wait for the stragglers to complete in a
> > given window. The current implementation does not force to wait for all
> of
> > the stragglers to complete. The stragglers are emitted only when there
> is a
> > new tuple that is being processed. i.e. when a new tuple arrives for
> > scoring , the straggler response queue is checked if there are any
> entries
> > and if yes, the responses are emitted into the stragglerPort. This
> > essentially means that there are situations when the straggler port is
> > emitting the result for a request submitted in the previous window. This
> > also implies that idempotency cannot be guaranteed across runs of the
> same
> > input data. In fact all threaded implementations have this issue as
> > ordering of the results is not guaranteed to be unique even within a
> given
> > window ?
> >
> > I can enforce a block/drain at the end of the window to force a
> completion
> > basing on the feedback.
> >
> >
> > Regards,
> > Ananth
> >
> > > On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pr...@datatorrent.com>
> > wrote:
> > >
> > > Hi Anath,
> > >
> > > Sounds interesting and looks like you have put quite a bit of work on
> it.
> > > Might I suggest changing the title of 2260 to better fit your proposal
> > and
> > > implementation, mainly so that there is differentiation from 2261.
> > >
> > > I wanted to discuss the proposal to use multiple threads in an operator
> > > instance. Unless the execution threads are blocking for some sort of
> i/o
> > > why would it result in a noticeable performance difference compared to
> > > processing in operator thread and running multiple partitions of the
> > > operator in container local. By running the processing in a separate
> > thread
> > > from the operator lifecycle thread you don't still get away from
> matching
> > > the incoming data throughput. The checkpoint will act as a time where
> you
> > > backpressure will start to materialize when the operator would have to
> > wait
> > > for your background processing to complete to guarantee all data till
> the
> > > checkpoint is processed.
> > >
> > > Thanks
> > >
> > >
> > > On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com>
> > wrote:
> > >
> > >> Hello All,
> > >>
> > >> I would like to submit the design for the Python execution operator
> > before
> > >> I raise the pull request so that I can refine the implementation based
> > on
> > >> feedback. Could you please provide feedback on the design if any and I
> > will
> > >> raise the PR accordingly.
> > >>
> > >> - This operator is for the JIRA ticket raised here
> > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> > >> - The operator embeds a python interpreter in the operator JVM process
> > >> space and is not external to the JVM.
> > >> - The implementation is proposing the use of Java Embedded Python (
> JEP
> > )
> > >> given here https://github.com/ninia/jep <https://github.com/ninia/jep
> >
> > >> - The JEP engine is under zlib/libpng license. Since this is an
> approved
> > >> license under https://www.apache.org/legal/resolved.html#category-a <
> > >> https://www.apache.org/legal/resolved.html#category-a> I am assuming
> it
> > >> is ok for the community to approve the inclusion of this library
> > >> - Python integration is a messy piece due to the nature of dynamic
> > >> libraries. All python libraries need to be natively installed. This
> also
> > >> means we will not be able bundle python libraries and dependencies as
> > part
> > >> of the build into the target JVM container. Hence this operator has
> the
> > >> current limitation of the python binaries installed through an
> external
> > >> process on all of the YARN nodes for now.
> > >> - The JEP maven dependency jar in the POM is a JNI wrapper around the
> > >> dynamic library that is installed externally to the Apex installation
> > >> process on all of the YARN nodes.
> > >> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796
> <
> > >> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
> issue
> > >> in the future.
> > >> - The python operator implementation can be extended to py4J based
> > >> implementation ( as opposed to in-memory model like JEP ) in the
> future
> > if
> > >> required be. JEP is the implementation based on an in-memory design
> > pattern.
> > >> - The python operator allows for 4 major API patterns
> > >>    - Execute a method call by accepting parameters to pass to the
> > >> interpreter
> > >>    - Execute a python script as given in a file path
> > >>    - Evaluate an expression and allows for passing of variables
> between
> > >> the java code and the python in-memory interpreter bridge
> > >>    - A handy method wherein a series of instructions can be passed in
> > one
> > >> single java call ( executed as a sequence of python eval instructions
> > under
> > >> the hood )
> > >> - Automatic garbage collection of the variables that are passed from
> > java
> > >> code to the in memory python interpreter
> > >> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> > >> xgboost. Preliminary tests for these libraries seem to work as per
> code
> > >> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> > >> apex/apexjvmpython <https://github.com/ananthc/
> > >> sampleapps/tree/master/apache-apex/apexjvmpython>
> > >> - The implementation allows for SLA based execution model. i.e. the
> > >> operator is given a chance to execute the python code and if not
> > complete
> > >> within a time out, the operator code returns back null.
> > >> - A tuple that has become a straggler as per previous point will
> > >> automatically be drained off to a different port so that downstream
> > >> operators can still consume the straggler if they want to when the
> > results
> > >> arrive.
> > >> - Because of the nature of python being an interpreter and if a
> previous
> > >> tuple is being still processed, there is chance of a back pressure
> > pattern
> > >> building up very quickly. Hence this operator works on the concept of
> a
> > >> worker pool. The Python operator uses a configurable number of worker
> > >> thread each of which embed the Python interpreter within their
> > processing
> > >> space. i.e. it is in fact a collection of python ink memory
> interpreters
> > >> inside the Python operator implementation.
> > >> - The operator chooses one of the threads at runtime basing on their
> > busy
> > >> state thus allowing for back-pressure issues to be resolved
> > automatically.
> > >> - There is a first class support for Numpy in JEP. Java arrays would
> be
> > >> convertible to the Python Numpy arrays and vice versa and share the
> same
> > >> memory addresses for efficiency reasons.
> > >> - The base operator implements dynamic partitioning based on a thread
> > >> starvation policy. At each checkpoint, it checks how much percentage
> of
> > the
> > >> requests resulted in starved threads and if the starvation exceeds a
> > >> configured percentage, a new instance of the operator is provisioned
> for
> > >> every such instance of the operator
> > >> - The operator provides the notion of a worker execution mode. There
> are
> > >> two worker modes that are passed in each of the above calls from the
> > user.
> > >> ALL or ANY.  Because python interpreter is state based engine, a newly
> > >> dynamically partitioned operator might not be in the exact state of
> the
> > >> remaining operators. Hence the operator has this notion of worker
> > execution
> > >> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
> > >> execution mode will be executed on all the workers of the worker
> thread
> > >> pool as well as the dynamically portioned instance whenever such an
> > >> instance is provisioned.
> > >> - The base operator implementation has a method that can be overridden
> > to
> > >> implement the logic that needs to be executed for each tuple. The base
> > >> operator default implementation is a simple NO-OP.
> > >> - The operator automatically picks up the least busy of the thread
> pool
> > >> worker which has JEP embedded in it to execute the call.
> > >> - The JEP based installation will not support non Cpython modules. All
> > of
> > >> the major python libraries are cpython based and hence I believe this
> > is of
> > >> a lesser concern. If we hit a roadblock when a new python library
> being
> > a
> > >> non-Cpython based library needs to be run, then we could implement the
> > >> ApexPythonEngine interface to something like Py4J which involves
> > >> interprocess communication.
> > >> - The python operator requires the user to set the library path
> > >> java.library.path for the operator to make use of the dynamic
> libraries
> > of
> > >> the corresponding platform. This has to be passed in as the JVM
> options.
> > >> Failing to do so will result in the operator failing to load the
> > >> interpreter properly.
> > >> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
> Numpy
> > >=
> > >> 1.7 is supported.
> > >> - There is no support for virtual environments yet. In case of
> multiple
> > >> python versions on the node, to include the right python version for
> the
> > >> apex operator, ensure that the environment variables and the dynamic
> > >> library path are set appropriately. This is a workaround and I hope
> > >> APEXCORE-796 will solve this issue as well.
> > >>
> > >>
> > >> Regards,
> > >> Ananth
> > >>
> > >>
> >
> >
>

Re: [Discuss] Design of the python execution operator

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Hi Ananth,

From your explanation, it looks like the threads overall allow you to
achieve two things. Have some sort of overall timeout if by which a tuple
doesn't finish processing then it is flagged as such. Second, it doesn't
block processing of subsequent tuples and you can still process them
meeting the SLA. By checkpoint, however, I think you should try to have a
resolution one way or the other for all the tuples received within the
checkpoint period or every window boundary (see idempotency below),
otherwise, there is a chance of data loss in case of operator restarts. If
a loss is acceptable for stragglers you could let straggler processing
continue beyond checkpoint boundary and let them finish when they can. You
could support both behaviors by use of a property. Furthermore, you may not
want all threads stuck with stragglers and then you are back to square one
so you may need to stop processing stragglers beyond a certain thread usage
threshold. Is there a way to interrupt the processing of the engine?

Then there is the question of idempotency. I suspect it would be difficult
to maintain it unless you wait for processing to finish for all tuples
received during the window every window boundary. You may provide an option
for relaxing the strict guarantees for the stragglers like mentioned above.

Pramod

On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <an...@gmail.com> wrote:

> Hello Pramod,
>
> Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> was thinking for the worker pool implementation.
>
> - The main reason ( which I forgot to mention in the design points below )
> is that the java embedded engine allows only the thread that created the
> instance to execute the python logic. This is more because of the JNI
> specification itself. Some hints here https://stackoverflow.com/
> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-
> multiple-threads> and here http://journals.ecs.soton.ac.
> uk/java/tutorial/native1.1/implementing/sync.html <
> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> implementing/sync.html>
>
> - This essentially means that the main operator thread will have to call
> the python code execution logic if the design were otherwise.
>
> - Since the end user can choose to can write any kind of logic including
> blocking I/O as part of the implementation, I did not want to stall the
> operator thread for any usage pattern.
>
> - In fact there is only one overall interpreter in the JVM process space
> and the interpreter thread is just a JNI wrapper around it to account for
> the JNI limitations above.
>
> - It is for the very same reason, there is an API in the implementation to
> support for registering Shared Modules across all of the interpreter
> threads. Use cases for this exist when there is a global variable provided
> by the underlying Python library and loading it multiple times can cause
> issues. Hence the API to register a shared module which can be used by all
> of the Interpreter Threads.
>
> - The operator submits to a work request queue and consumes from a
> response queue for each of the interpreter thread. There exists one request
> and one response queue per interpreter thread.
>
> - The stragglers will get drained from the response queue for a previously
> submitted request queue.
>
> - The other reason why I chose to implement it this ways is also for some
> of the use case that I foresee in the ML scoring scenarios. In fraud
> systems, if I have a strict SLA to score a model, the main thread in the
> operator is not helping me implement this pattern at all. The caller to the
> Apex application will need to proceed if the scoring gets delayed due to
> whatever reason. However the scoring can continue on the interpreter thread
> and can be drained later ( It is just that the caller did not make use of
> this result but can still be persisted for operators consuming from the
> straggler port.
>
> - There are 3 output ports for this operator. DefaultOutputPort,
> stragglersPort and an errorPort.
>
> - Some libraries like Tensorflow can become really heavy. Tensorflow
> models can execute a tensorflow DAG as part of a model scoring
> implementation and hence I wanted to take the approach of a worker pool.
> Yes your point is valid if we wait for the stragglers to complete in a
> given window. The current implementation does not force to wait for all of
> the stragglers to complete. The stragglers are emitted only when there is a
> new tuple that is being processed. i.e. when a new tuple arrives for
> scoring , the straggler response queue is checked if there are any entries
> and if yes, the responses are emitted into the stragglerPort. This
> essentially means that there are situations when the straggler port is
> emitting the result for a request submitted in the previous window. This
> also implies that idempotency cannot be guaranteed across runs of the same
> input data. In fact all threaded implementations have this issue as
> ordering of the results is not guaranteed to be unique even within a given
> window ?
>
> I can enforce a block/drain at the end of the window to force a completion
> basing on the feedback.
>
>
> Regards,
> Ananth
>
> > On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
> >
> > Hi Anath,
> >
> > Sounds interesting and looks like you have put quite a bit of work on it.
> > Might I suggest changing the title of 2260 to better fit your proposal
> and
> > implementation, mainly so that there is differentiation from 2261.
> >
> > I wanted to discuss the proposal to use multiple threads in an operator
> > instance. Unless the execution threads are blocking for some sort of i/o
> > why would it result in a noticeable performance difference compared to
> > processing in operator thread and running multiple partitions of the
> > operator in container local. By running the processing in a separate
> thread
> > from the operator lifecycle thread you don't still get away from matching
> > the incoming data throughput. The checkpoint will act as a time where you
> > backpressure will start to materialize when the operator would have to
> wait
> > for your background processing to complete to guarantee all data till the
> > checkpoint is processed.
> >
> > Thanks
> >
> >
> > On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com>
> wrote:
> >
> >> Hello All,
> >>
> >> I would like to submit the design for the Python execution operator
> before
> >> I raise the pull request so that I can refine the implementation based
> on
> >> feedback. Could you please provide feedback on the design if any and I
> will
> >> raise the PR accordingly.
> >>
> >> - This operator is for the JIRA ticket raised here
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> >> - The operator embeds a python interpreter in the operator JVM process
> >> space and is not external to the JVM.
> >> - The implementation is proposing the use of Java Embedded Python ( JEP
> )
> >> given here https://github.com/ninia/jep <https://github.com/ninia/jep>
> >> - The JEP engine is under zlib/libpng license. Since this is an approved
> >> license under https://www.apache.org/legal/resolved.html#category-a <
> >> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
> >> is ok for the community to approve the inclusion of this library
> >> - Python integration is a messy piece due to the nature of dynamic
> >> libraries. All python libraries need to be natively installed. This also
> >> means we will not be able bundle python libraries and dependencies as
> part
> >> of the build into the target JVM container. Hence this operator has the
> >> current limitation of the python binaries installed through an external
> >> process on all of the YARN nodes for now.
> >> - The JEP maven dependency jar in the POM is a JNI wrapper around the
> >> dynamic library that is installed externally to the Apex installation
> >> process on all of the YARN nodes.
> >> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
> >> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
> >> in the future.
> >> - The python operator implementation can be extended to py4J based
> >> implementation ( as opposed to in-memory model like JEP ) in the future
> if
> >> required be. JEP is the implementation based on an in-memory design
> pattern.
> >> - The python operator allows for 4 major API patterns
> >>    - Execute a method call by accepting parameters to pass to the
> >> interpreter
> >>    - Execute a python script as given in a file path
> >>    - Evaluate an expression and allows for passing of variables between
> >> the java code and the python in-memory interpreter bridge
> >>    - A handy method wherein a series of instructions can be passed in
> one
> >> single java call ( executed as a sequence of python eval instructions
> under
> >> the hood )
> >> - Automatic garbage collection of the variables that are passed from
> java
> >> code to the in memory python interpreter
> >> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> >> xgboost. Preliminary tests for these libraries seem to work as per code
> >> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> >> apex/apexjvmpython <https://github.com/ananthc/
> >> sampleapps/tree/master/apache-apex/apexjvmpython>
> >> - The implementation allows for SLA based execution model. i.e. the
> >> operator is given a chance to execute the python code and if not
> complete
> >> within a time out, the operator code returns back null.
> >> - A tuple that has become a straggler as per previous point will
> >> automatically be drained off to a different port so that downstream
> >> operators can still consume the straggler if they want to when the
> results
> >> arrive.
> >> - Because of the nature of python being an interpreter and if a previous
> >> tuple is being still processed, there is chance of a back pressure
> pattern
> >> building up very quickly. Hence this operator works on the concept of a
> >> worker pool. The Python operator uses a configurable number of worker
> >> thread each of which embed the Python interpreter within their
> processing
> >> space. i.e. it is in fact a collection of python ink memory interpreters
> >> inside the Python operator implementation.
> >> - The operator chooses one of the threads at runtime basing on their
> busy
> >> state thus allowing for back-pressure issues to be resolved
> automatically.
> >> - There is a first class support for Numpy in JEP. Java arrays would be
> >> convertible to the Python Numpy arrays and vice versa and share the same
> >> memory addresses for efficiency reasons.
> >> - The base operator implements dynamic partitioning based on a thread
> >> starvation policy. At each checkpoint, it checks how much percentage of
> the
> >> requests resulted in starved threads and if the starvation exceeds a
> >> configured percentage, a new instance of the operator is provisioned for
> >> every such instance of the operator
> >> - The operator provides the notion of a worker execution mode. There are
> >> two worker modes that are passed in each of the above calls from the
> user.
> >> ALL or ANY.  Because python interpreter is state based engine, a newly
> >> dynamically partitioned operator might not be in the exact state of the
> >> remaining operators. Hence the operator has this notion of worker
> execution
> >> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
> >> execution mode will be executed on all the workers of the worker thread
> >> pool as well as the dynamically portioned instance whenever such an
> >> instance is provisioned.
> >> - The base operator implementation has a method that can be overridden
> to
> >> implement the logic that needs to be executed for each tuple. The base
> >> operator default implementation is a simple NO-OP.
> >> - The operator automatically picks up the least busy of the thread pool
> >> worker which has JEP embedded in it to execute the call.
> >> - The JEP based installation will not support non Cpython modules. All
> of
> >> the major python libraries are cpython based and hence I believe this
> is of
> >> a lesser concern. If we hit a roadblock when a new python library being
> a
> >> non-Cpython based library needs to be run, then we could implement the
> >> ApexPythonEngine interface to something like Py4J which involves
> >> interprocess communication.
> >> - The python operator requires the user to set the library path
> >> java.library.path for the operator to make use of the dynamic libraries
> of
> >> the corresponding platform. This has to be passed in as the JVM options.
> >> Failing to do so will result in the operator failing to load the
> >> interpreter properly.
> >> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy
> >=
> >> 1.7 is supported.
> >> - There is no support for virtual environments yet. In case of multiple
> >> python versions on the node, to include the right python version for the
> >> apex operator, ensure that the environment variables and the dynamic
> >> library path are set appropriately. This is a workaround and I hope
> >> APEXCORE-796 will solve this issue as well.
> >>
> >>
> >> Regards,
> >> Ananth
> >>
> >>
>
>

Re: [Discuss] Design of the python execution operator

Posted by Ananth G <an...@gmail.com>.
Hello Pramod,

Thanks for the comments. I adjusted the title of the JIRA. Here is what I was thinking for the worker pool implementation.

- The main reason ( which I forgot to mention in the design points below ) is that the java embedded engine allows only the thread that created the instance to execute the python logic. This is more because of the JNI specification itself. Some hints here https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-multiple-threads <https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-multiple-threads> and here http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/implementing/sync.html <http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/implementing/sync.html>

- This essentially means that the main operator thread will have to call the python code execution logic if the design were otherwise.

- Since the end user can choose to can write any kind of logic including blocking I/O as part of the implementation, I did not want to stall the operator thread for any usage pattern. 

- In fact there is only one overall interpreter in the JVM process space and the interpreter thread is just a JNI wrapper around it to account for the JNI limitations above.

- It is for the very same reason, there is an API in the implementation to support for registering Shared Modules across all of the interpreter threads. Use cases for this exist when there is a global variable provided by the underlying Python library and loading it multiple times can cause issues. Hence the API to register a shared module which can be used by all of the Interpreter Threads. 

- The operator submits to a work request queue and consumes from a response queue for each of the interpreter thread. There exists one request and one response queue per interpreter thread.

- The stragglers will get drained from the response queue for a previously submitted request queue. 

- The other reason why I chose to implement it this ways is also for some of the use case that I foresee in the ML scoring scenarios. In fraud systems, if I have a strict SLA to score a model, the main thread in the operator is not helping me implement this pattern at all. The caller to the Apex application will need to proceed if the scoring gets delayed due to whatever reason. However the scoring can continue on the interpreter thread and can be drained later ( It is just that the caller did not make use of this result but can still be persisted for operators consuming from the straggler port. 

- There are 3 output ports for this operator. DefaultOutputPort, stragglersPort and an errorPort. 

- Some libraries like Tensorflow can become really heavy. Tensorflow models can execute a tensorflow DAG as part of a model scoring implementation and hence I wanted to take the approach of a worker pool. Yes your point is valid if we wait for the stragglers to complete in a given window. The current implementation does not force to wait for all of the stragglers to complete. The stragglers are emitted only when there is a new tuple that is being processed. i.e. when a new tuple arrives for scoring , the straggler response queue is checked if there are any entries and if yes, the responses are emitted into the stragglerPort. This essentially means that there are situations when the straggler port is emitting the result for a request submitted in the previous window. This also implies that idempotency cannot be guaranteed across runs of the same input data. In fact all threaded implementations have this issue as ordering of the results is not guaranteed to be unique even within a given window ?

I can enforce a block/drain at the end of the window to force a completion basing on the feedback. 


Regards,
Ananth

> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pr...@datatorrent.com> wrote:
> 
> Hi Anath,
> 
> Sounds interesting and looks like you have put quite a bit of work on it.
> Might I suggest changing the title of 2260 to better fit your proposal and
> implementation, mainly so that there is differentiation from 2261.
> 
> I wanted to discuss the proposal to use multiple threads in an operator
> instance. Unless the execution threads are blocking for some sort of i/o
> why would it result in a noticeable performance difference compared to
> processing in operator thread and running multiple partitions of the
> operator in container local. By running the processing in a separate thread
> from the operator lifecycle thread you don't still get away from matching
> the incoming data throughput. The checkpoint will act as a time where you
> backpressure will start to materialize when the operator would have to wait
> for your background processing to complete to guarantee all data till the
> checkpoint is processed.
> 
> Thanks
> 
> 
> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com> wrote:
> 
>> Hello All,
>> 
>> I would like to submit the design for the Python execution operator before
>> I raise the pull request so that I can refine the implementation based on
>> feedback. Could you please provide feedback on the design if any and I will
>> raise the PR accordingly.
>> 
>> - This operator is for the JIRA ticket raised here
>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
>> - The operator embeds a python interpreter in the operator JVM process
>> space and is not external to the JVM.
>> - The implementation is proposing the use of Java Embedded Python ( JEP )
>> given here https://github.com/ninia/jep <https://github.com/ninia/jep>
>> - The JEP engine is under zlib/libpng license. Since this is an approved
>> license under https://www.apache.org/legal/resolved.html#category-a <
>> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
>> is ok for the community to approve the inclusion of this library
>> - Python integration is a messy piece due to the nature of dynamic
>> libraries. All python libraries need to be natively installed. This also
>> means we will not be able bundle python libraries and dependencies as part
>> of the build into the target JVM container. Hence this operator has the
>> current limitation of the python binaries installed through an external
>> process on all of the YARN nodes for now.
>> - The JEP maven dependency jar in the POM is a JNI wrapper around the
>> dynamic library that is installed externally to the Apex installation
>> process on all of the YARN nodes.
>> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
>> in the future.
>> - The python operator implementation can be extended to py4J based
>> implementation ( as opposed to in-memory model like JEP ) in the future if
>> required be. JEP is the implementation based on an in-memory design pattern.
>> - The python operator allows for 4 major API patterns
>>    - Execute a method call by accepting parameters to pass to the
>> interpreter
>>    - Execute a python script as given in a file path
>>    - Evaluate an expression and allows for passing of variables between
>> the java code and the python in-memory interpreter bridge
>>    - A handy method wherein a series of instructions can be passed in one
>> single java call ( executed as a sequence of python eval instructions under
>> the hood )
>> - Automatic garbage collection of the variables that are passed from java
>> code to the in memory python interpreter
>> - Support for all major python libraries. Tensorflow, Keras, Scikit,
>> xgboost. Preliminary tests for these libraries seem to work as per code
>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
>> apex/apexjvmpython <https://github.com/ananthc/
>> sampleapps/tree/master/apache-apex/apexjvmpython>
>> - The implementation allows for SLA based execution model. i.e. the
>> operator is given a chance to execute the python code and if not complete
>> within a time out, the operator code returns back null.
>> - A tuple that has become a straggler as per previous point will
>> automatically be drained off to a different port so that downstream
>> operators can still consume the straggler if they want to when the results
>> arrive.
>> - Because of the nature of python being an interpreter and if a previous
>> tuple is being still processed, there is chance of a back pressure pattern
>> building up very quickly. Hence this operator works on the concept of a
>> worker pool. The Python operator uses a configurable number of worker
>> thread each of which embed the Python interpreter within their processing
>> space. i.e. it is in fact a collection of python ink memory interpreters
>> inside the Python operator implementation.
>> - The operator chooses one of the threads at runtime basing on their busy
>> state thus allowing for back-pressure issues to be resolved automatically.
>> - There is a first class support for Numpy in JEP. Java arrays would be
>> convertible to the Python Numpy arrays and vice versa and share the same
>> memory addresses for efficiency reasons.
>> - The base operator implements dynamic partitioning based on a thread
>> starvation policy. At each checkpoint, it checks how much percentage of the
>> requests resulted in starved threads and if the starvation exceeds a
>> configured percentage, a new instance of the operator is provisioned for
>> every such instance of the operator
>> - The operator provides the notion of a worker execution mode. There are
>> two worker modes that are passed in each of the above calls from the user.
>> ALL or ANY.  Because python interpreter is state based engine, a newly
>> dynamically partitioned operator might not be in the exact state of the
>> remaining operators. Hence the operator has this notion of worker execution
>> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
>> execution mode will be executed on all the workers of the worker thread
>> pool as well as the dynamically portioned instance whenever such an
>> instance is provisioned.
>> - The base operator implementation has a method that can be overridden to
>> implement the logic that needs to be executed for each tuple. The base
>> operator default implementation is a simple NO-OP.
>> - The operator automatically picks up the least busy of the thread pool
>> worker which has JEP embedded in it to execute the call.
>> - The JEP based installation will not support non Cpython modules. All of
>> the major python libraries are cpython based and hence I believe this is of
>> a lesser concern. If we hit a roadblock when a new python library being a
>> non-Cpython based library needs to be run, then we could implement the
>> ApexPythonEngine interface to something like Py4J which involves
>> interprocess communication.
>> - The python operator requires the user to set the library path
>> java.library.path for the operator to make use of the dynamic libraries of
>> the corresponding platform. This has to be passed in as the JVM options.
>> Failing to do so will result in the operator failing to load the
>> interpreter properly.
>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy >=
>> 1.7 is supported.
>> - There is no support for virtual environments yet. In case of multiple
>> python versions on the node, to include the right python version for the
>> apex operator, ensure that the environment variables and the dynamic
>> library path are set appropriately. This is a workaround and I hope
>> APEXCORE-796 will solve this issue as well.
>> 
>> 
>> Regards,
>> Ananth
>> 
>> 


Re: [Discuss] Design of the python execution operator

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Hi Anath,

Sounds interesting and looks like you have put quite a bit of work on it.
Might I suggest changing the title of 2260 to better fit your proposal and
implementation, mainly so that there is differentiation from 2261.

I wanted to discuss the proposal to use multiple threads in an operator
instance. Unless the execution threads are blocking for some sort of i/o
why would it result in a noticeable performance difference compared to
processing in operator thread and running multiple partitions of the
operator in container local. By running the processing in a separate thread
from the operator lifecycle thread you don't still get away from matching
the incoming data throughput. The checkpoint will act as a time where you
backpressure will start to materialize when the operator would have to wait
for your background processing to complete to guarantee all data till the
checkpoint is processed.

Thanks


On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <an...@gmail.com> wrote:

> Hello All,
>
> I would like to submit the design for the Python execution operator before
> I raise the pull request so that I can refine the implementation based on
> feedback. Could you please provide feedback on the design if any and I will
> raise the PR accordingly.
>
> - This operator is for the JIRA ticket raised here
> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> - The operator embeds a python interpreter in the operator JVM process
> space and is not external to the JVM.
> - The implementation is proposing the use of Java Embedded Python ( JEP )
> given here https://github.com/ninia/jep <https://github.com/ninia/jep>
> - The JEP engine is under zlib/libpng license. Since this is an approved
> license under https://www.apache.org/legal/resolved.html#category-a <
> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
> is ok for the community to approve the inclusion of this library
> - Python integration is a messy piece due to the nature of dynamic
> libraries. All python libraries need to be natively installed. This also
> means we will not be able bundle python libraries and dependencies as part
> of the build into the target JVM container. Hence this operator has the
> current limitation of the python binaries installed through an external
> process on all of the YARN nodes for now.
> - The JEP maven dependency jar in the POM is a JNI wrapper around the
> dynamic library that is installed externally to the Apex installation
> process on all of the YARN nodes.
> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
> in the future.
> - The python operator implementation can be extended to py4J based
> implementation ( as opposed to in-memory model like JEP ) in the future if
> required be. JEP is the implementation based on an in-memory design pattern.
> - The python operator allows for 4 major API patterns
>     - Execute a method call by accepting parameters to pass to the
> interpreter
>     - Execute a python script as given in a file path
>     - Evaluate an expression and allows for passing of variables between
> the java code and the python in-memory interpreter bridge
>     - A handy method wherein a series of instructions can be passed in one
> single java call ( executed as a sequence of python eval instructions under
> the hood )
> - Automatic garbage collection of the variables that are passed from java
> code to the in memory python interpreter
> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> xgboost. Preliminary tests for these libraries seem to work as per code
> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> apex/apexjvmpython <https://github.com/ananthc/
> sampleapps/tree/master/apache-apex/apexjvmpython>
> - The implementation allows for SLA based execution model. i.e. the
> operator is given a chance to execute the python code and if not complete
> within a time out, the operator code returns back null.
> - A tuple that has become a straggler as per previous point will
> automatically be drained off to a different port so that downstream
> operators can still consume the straggler if they want to when the results
> arrive.
> - Because of the nature of python being an interpreter and if a previous
> tuple is being still processed, there is chance of a back pressure pattern
> building up very quickly. Hence this operator works on the concept of a
> worker pool. The Python operator uses a configurable number of worker
> thread each of which embed the Python interpreter within their processing
> space. i.e. it is in fact a collection of python ink memory interpreters
> inside the Python operator implementation.
> - The operator chooses one of the threads at runtime basing on their busy
> state thus allowing for back-pressure issues to be resolved automatically.
> - There is a first class support for Numpy in JEP. Java arrays would be
> convertible to the Python Numpy arrays and vice versa and share the same
> memory addresses for efficiency reasons.
> - The base operator implements dynamic partitioning based on a thread
> starvation policy. At each checkpoint, it checks how much percentage of the
> requests resulted in starved threads and if the starvation exceeds a
> configured percentage, a new instance of the operator is provisioned for
> every such instance of the operator
> - The operator provides the notion of a worker execution mode. There are
> two worker modes that are passed in each of the above calls from the user.
> ALL or ANY.  Because python interpreter is state based engine, a newly
> dynamically partitioned operator might not be in the exact state of the
> remaining operators. Hence the operator has this notion of worker execution
> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
> execution mode will be executed on all the workers of the worker thread
> pool as well as the dynamically portioned instance whenever such an
> instance is provisioned.
> - The base operator implementation has a method that can be overridden to
> implement the logic that needs to be executed for each tuple. The base
> operator default implementation is a simple NO-OP.
> - The operator automatically picks up the least busy of the thread pool
> worker which has JEP embedded in it to execute the call.
> - The JEP based installation will not support non Cpython modules. All of
> the major python libraries are cpython based and hence I believe this is of
> a lesser concern. If we hit a roadblock when a new python library being a
> non-Cpython based library needs to be run, then we could implement the
> ApexPythonEngine interface to something like Py4J which involves
> interprocess communication.
> - The python operator requires the user to set the library path
> java.library.path for the operator to make use of the dynamic libraries of
> the corresponding platform. This has to be passed in as the JVM options.
> Failing to do so will result in the operator failing to load the
> interpreter properly.
> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy >=
> 1.7 is supported.
> - There is no support for virtual environments yet. In case of multiple
> python versions on the node, to include the right python version for the
> apex operator, ensure that the environment variables and the dynamic
> library path are set appropriately. This is a workaround and I hope
> APEXCORE-796 will solve this issue as well.
>
>
> Regards,
> Ananth
>
>