You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2018/03/26 13:49:09 UTC
[apex-malhar] branch master updated:
APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex
operator
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 55a6abb APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex operator
55a6abb is described below
commit 55a6abb64160e6d0228c26d347ff6c83b5bfd3ad
Author: Ananth Gundabattula <an...@apache.org>
AuthorDate: Wed Mar 7 17:00:09 2018 +1100
APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex operator
---
pom.xml | 1 +
python/pom.xml | 94 ++++
.../apex/malhar/python/base/ApexPythonEngine.java | 185 ++++++
.../base/ApexPythonInterpreterException.java | 35 ++
.../python/base/BasePythonExecutionOperator.java | 601 ++++++++++++++++++++
.../python/base/PythonInterpreterConfig.java | 37 ++
.../malhar/python/base/WorkerExecutionMode.java | 43 ++
.../malhar/python/base/jep/InterpreterThread.java | 626 +++++++++++++++++++++
.../malhar/python/base/jep/InterpreterWrapper.java | 350 ++++++++++++
.../malhar/python/base/jep/JepPythonEngine.java | 621 ++++++++++++++++++++
.../apex/malhar/python/base/jep/SpinPolicy.java | 25 +
.../apex/malhar/python/base/jep/package-info.java | 22 +
.../apex/malhar/python/base/package-info.java | 22 +
.../AbstractPythonExecutionPartitioner.java | 75 +++
.../PythonExecutionPartitionerType.java | 24 +
.../ThreadStarvationBasedPartitioner.java | 92 +++
.../python/base/partitioner/package-info.java | 22 +
.../requestresponse/EvalCommandRequestPayload.java | 72 +++
.../GenericCommandsRequestPayload.java | 36 ++
.../requestresponse/MethodCallRequestPayload.java | 48 ++
.../base/requestresponse/PythonCommandType.java | 27 +
.../requestresponse/PythonInterpreterRequest.java | 130 +++++
.../requestresponse/PythonInterpreterResponse.java | 61 ++
.../requestresponse/PythonRequestResponse.java | 96 ++++
.../ScriptExecutionRequestPayload.java | 34 ++
.../python/base/requestresponse/package-info.java | 22 +
.../python/base/util/NDArrayKryoSerializer.java | 170 ++++++
.../malhar/python/base/util/NDimensionalArray.java | 102 ++++
.../base/util/PythonRequestResponseUtil.java | 133 +++++
.../malhar/python/PythonExecutorApplication.java | 60 ++
.../python/PythonExecutorApplicationTest.java | 96 ++++
.../malhar/python/PythonPayloadPOJOGenerator.java | 169 ++++++
.../apex/malhar/python/PythonProcessingPojo.java | 75 +++
.../apex/malhar/python/SimplePythonOpOperator.java | 101 ++++
.../apex/malhar/python/base/jep/BaseJEPTest.java | 211 +++++++
.../python/base/jep/InterpreterThreadTest.java | 176 ++++++
.../python/base/jep/InterpreterWrapperTest.java | 97 ++++
.../python/base/jep/JepPythonEngineTest.java | 172 ++++++
.../apex/malhar/python/test/BasePythonTest.java | 38 ++
.../malhar/python/test/JepPythonTestContext.java | 37 ++
.../python/test/PythonAvailabilityTestRule.java | 80 +++
python/src/test/resources/META-INF/properties.xml | 28 +
python/src/test/resources/factorial.py | 49 ++
python/src/test/resources/log4j.properties | 34 ++
44 files changed, 5229 insertions(+)
diff --git a/pom.xml b/pom.xml
index c8925c4..0f62475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,6 +216,7 @@
<module>contrib</module>
<module>kafka</module>
<module>kudu</module>
+ <module>python</module>
<module>examples</module>
</modules>
diff --git a/python/pom.xml b/python/pom.xml
new file mode 100755
index 0000000..9a2d13f
--- /dev/null
+++ b/python/pom.xml
@@ -0,0 +1,94 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-python</artifactId>
+ <name>Apex library python support</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <jep-version>3.7.0</jep-version>
+ <jepInstallPath>/usr/local/lib/python3.5/site-packages/jep</jepInstallPath> <!-- Purposefully empty as it is ideally set from command line basing on env but
+ avoid errors in IDEs validating a pom.xml which requires a property to be set when referred in the configs-->
+ <disruptor-queue-conversant-media-version>1.2.10</disruptor-queue-conversant-media-version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ <type>jar</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>black.ninia</groupId>
+ <artifactId>jep</artifactId>
+ <version>${jep-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.conversantmedia</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor-queue-conversant-media-version}</version>
+ <classifier>jdk7</classifier><!-- Set classifier to jdk8 when malhar switches to JDK 8 -->
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>once</forkMode>
+ <argLine>-Djava.library.path=${jepInstallPath}</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
new file mode 100644
index 0000000..6370e32
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/**
+ * Defines the methods that needs to be implemented by the Python Engine implementations. The first implementation
+ * takes the approach of in-memory interpreter using JEP. Other possibilities are using Py4J which is an inter process
+ * communication model. An Apex operator would use an instance of the Python engine implementations to run
+ * python code using the chosen engine.
+ */
+public interface ApexPythonEngine
+{
+ /***
+ * Used to perform any pre interpreter processing.
+ * @param preInitConfigs The configuration that is going to be used by the interpreter
+ * @throws ApexPythonInterpreterException if there is an issue in executing the pre interpreter logic
+ */
+ void preInitInterpreter(Map<PythonInterpreterConfig,Object> preInitConfigs) throws ApexPythonInterpreterException;
+
+ /***
+ * Starts the interpreter.
+ * @throws ApexPythonInterpreterException if library not locatable or any other issue starting the interpreter
+ */
+ void startInterpreter() throws ApexPythonInterpreterException;
+
+ /***
+ * Used to perform any logic that needs to be executed after the interpreter is started but before any tuples start
+ * getting processed. Example, setting the starting state of the variables that are used in tuple processing.
+ * @throws ApexPythonInterpreterException
+ */
+ void postStartInterpreter() throws ApexPythonInterpreterException;
+
+ /***
+ * Runs a series of commands. The implementation engine could make use of a worker pool to execute the command.
+ * @param executionMode Whether these commands need to be run on all worker thread or any of the worker thread.Please see
+ * * {@link WorkerExecutionMode} for choices available
+ * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+ * we want to implement a sticky worker in the near future. This will allow for a basic approach to
+ * route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+ * worker logic, the window ID along with the Request ID is used to implement a round robin approach
+ * to select the next worker. Note that Sticky worker might be required since python interpreter
+ * state is accumulated over as the commands run and a command can reference a variable created in a
+ * previous command etc. Such references might want to route all commands to a specific interpreter
+ * instance. If the Apex python engine is not being used by an operator implementation directly,
+ * the caller can pass in any number as it is not used in anything more than selecting a worker from a
+ * worker pool.
+ * @param requestId The parameter is used to select a worker from the
+ * worker pool along with the window Id. If the Apex python engine is not being used by an
+ * operator implementation directly, the caller can pass in any number as it is not used in anything
+ * more than selecting a worker from a worker pool.
+ * @param request Represents the request to be processed.
+ * @return A map with key as the command run and boolean as the value. True represents that the command successfully
+ * run.
+ * @throws ApexPythonInterpreterException if interrupted or if the command cannot be executed
+ */
+ Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode, long windowId, long requestId,
+ PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+ /***
+ * Executes a method call
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+ * * {@link WorkerExecutionMode} for choices available
+ * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+ * we want to implement a sticky worker in the near future. This will allow for a basic approach to
+ * route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+ * worker logic, the window ID along with the Request ID is used to implement a round robin approach
+ * to select the next worker. Note that Sticky worker might be required since python interpreter
+ * state is accumulated over as the commands run and a command can reference a variable created in a
+ * previous command etc. Such references might want to route all commands to a specific interpreter
+ * instance. If the Apex python engine is not being used by an operator implementation directly,
+ * the caller can pass in any number as it is not used in anything more than selecting a worker from a
+ * worker pool.
+ * @param requestId The parameter is used to select a worker from the
+ * worker pool along with the window Id. If the Apex python engine is not being used by an
+ * operator implementation directly, the caller can pass in any number as it is not used in anything
+ * more than selecting a worker from a worker pool.
+ * @param req Represents the request to be processed.
+ * @param <T>
+ * @return A map containing the worker ID as key and boolean as successful or not
+ * @throws ApexPythonInterpreterException
+ */
+ <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+ long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
+
+ /***
+ * Executes a script that is locatable via a file path
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+ * {@link WorkerExecutionMode} for choices available
+ * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+ * we want to implement a sticky worker in the near future. This will allow for a basic approach to
+ * route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+ * worker logic, the window ID along with the Request ID is used to implement a round robin approach
+ * to select the next worker. Note that Sticky worker might be required since python interpreter
+ * state is accumulated over as the commands run and a command can reference a variable created in a
+ * previous command etc. Such references might want to route all commands to a specific interpreter
+ * instance. If the Apex python engine is not being used by an operator implementation directly,
+ * the caller can pass in any number as it is not used in anything more than selecting a worker from a
+ * worker pool.
+ * @param requestId The parameter is used to select a worker from the
+ * worker pool along with the window Id. If the Apex python engine is not being used by an
+ * operator implementation directly, the caller can pass in any number as it is not used in anything
+ * more than selecting a worker from a worker pool.
+ * @param request Represents the request to be processed.
+ * @return A map containing the worker ID as key and boolean as successful or not
+ * @throws ApexPythonInterpreterException
+ */
+ Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,long requestId,
+ PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+ /***
+ * Evaluates a string as a python expression and also supports passing in variables from JVM to the python interpreter
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+ * * {@link WorkerExecutionMode} for choices available
+ * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+ * we want to implement a sticky worker in the near future. This will allow for a basic approach to
+ * route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+ * worker logic, the window ID along with the Request ID is used to implement a round robin approach
+ * to select the next worker. Note that Sticky worker might be required since python interpreter
+ * state is accumulated over as the commands run and a command can reference a variable created in a
+ * previous command etc. Such references might want to route all commands to a specific interpreter
+ * instance. If the Apex python engine is not being used by an operator implementation directly,
+ * the caller can pass in any number as it is not used in anything more than selecting a worker from a
+ * worker pool.
+ * @param requestId The parameter is used to select a worker from the
+ * worker pool along with the window Id. If the Apex python engine is not being used by an
+ * operator implementation directly, the caller can pass in any number as it is not used in anything
+ * more than selecting a worker from a worker pool.
+ * @param req Represents the request to be processed.
+ * @param <T> Java templating signature
+ * @return A map containing the worker ID as key and boolean as successful or not
+ * @throws ApexPythonInterpreterException
+ */
+ <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode, long windowId, long requestId,
+ PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
+
+ /***
+ * @return The queue that holds all of the straggler responses.
+ */
+ BlockingQueue<PythonRequestResponse> getDelayedResponseQueue();
+
+ void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue);
+
+ /***
+ * @return The number of times the engine could not process a request as there were no free worker threads and hence
+ * had to return null
+ */
+ long getNumStarvedReturns();
+
+
+ void setNumStarvedReturns(long numStarvedReturns);
+
+ /**
+ * Returns all of the commands that were executed on all of the worker nodes.
+ * @return History of all commands executed in sequence
+ */
+ List<PythonRequestResponse> getCommandHistory();
+
+ void setCommandHistory(List<PythonRequestResponse> commandHistory);
+
+ void stopInterpreter() throws ApexPythonInterpreterException;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
new file mode 100644
index 0000000..be64d3e
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+/**
+ * An exception used to denote issues when using the ApexPython Engine.
+ */
+public class ApexPythonInterpreterException extends RuntimeException
+{
+ public ApexPythonInterpreterException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public ApexPythonInterpreterException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
new file mode 100644
index 0000000..cfc2056
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.InterpreterThread;
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.partitioner.AbstractPythonExecutionPartitioner;
+import org.apache.apex.malhar.python.base.partitioner.PythonExecutionPartitionerType;
+import org.apache.apex.malhar.python.base.partitioner.ThreadStarvationBasedPartitioner;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/***
+ * <p>The operator provides a mechanism to execute arbitrary python code by using the {@link JepPythonEngine} as its
+ * default engine.</p>
+ *
+ * <p>See {@link JepPythonEngine} and {@link InterpreterThread} for more detailed javadocs about the interpreter
+ * itself and the API patterns possible through this engine</p>
+ *
+ * <p>Note that the JVM option of the library path needs to be set to a value which contains the JEP dynamic library.
+ * For example, -Djava.library.path=/usr/local/lib/python3.5/site-packages/jep assuming that the JEP library is
+ * installed at the above location
+ * </p>
+ *
+ * <p>If using CPython libraries which involve global variables, please use the
+ * {@link PythonInterpreterConfig#PYTHON_SHARED_LIBS} as one of the keys to specify this library as a shared library.
+ * Note that the interpreter configs are passed via {@link ApexPythonEngine#preInitInterpreter(Map)} method.
+ * Overrriding the {@link BasePythonExecutionOperator#getPreInitConfigurations()} and specifying the configs can
+ * help in specifying the shared libraries that are loaded by all the interpreter threads accordingly.</p>
+ *
+ * <p>The operator comes with the following limitations
+ * <ol>
+ * <li>Cannot be used in THREAD LOCAL MODE where the downstream operator is using a different version of the
+ * the python interpreter</li>
+ * <li>In certain use cases the operator cannot be used CONTAINER LOCAL MODE when there are global defs in the
+ * CPython library that is being used and the downstream operator depends on those globals ( even though downstream
+ * is using the same python version of the upstream operator</li>
+ * <li>Only CPython libraries are supported and uses the JNI mechanisms to use the CPython libraries</li>
+ * <li>Complex data types cannot be automatically translated to the Python interpreter space. Current
+ * workaround involves copying the basic types and build the complex type using python code and functionality
+ * provided by the {@link JepPythonEngine} </li>
+ * <li>Numpy arrays need to be specified as {@link NDimensionalArray} and the engine automatically translates them
+ * to a Numpy array. See {@link NDimensionalArray#toNDArray()} for more details</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Shared libraries enable sharing of global modules across interpreter workers in a work pool.
+ * The following snippet of code illustrated the registering of numpy as a shared module.
+ * <code>
+ * Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ * Set<String> sharedLibsList = new HashSet<>();
+ * sharedLibsList.add("numpy");
+ * preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+ * </code>
+ * Passing the above config in overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()} will help
+ * in using modules like numpy which have global shared variables.
+ * </p>
+ * <p>
+ * If there are errors while running the operator code with exceptions as module 'jep' has no attribute 'java_import_hook'
+ * This essentially is generated when JEP is not able to load a library that the python code is using. Ensure
+ * that the PYTHONPATH environment variable is set pointing to the right location. This is especially
+ * required when there are multiple python versions in the code.
+ * </p>
+ *
+ * <p>
+ * Java 9 is not yet supported. Support only exists for Java 8 and Java 7 runtimes.
+ * </p>
+ *
+ * <p>For very low time SLA requirements, it is encouraged to set the spin policy to be busy wait using the
+ * following configuration snippet for the JEP engine
+ * <code>
+ * Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ * preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, ""+ SpinPolicy.BUSY_SPIN.name());
+ * </code>
+ * This mapping can be set by overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()}. For more
+ * details refer {@link SpinPolicy}.
+ * </p>
+ * @param <T> Represents the incoming tuple.
+ */
+public class BasePythonExecutionOperator<T> extends BaseOperator implements
+ Operator.ActivationListener<Context.OperatorContext>, Partitioner<BasePythonExecutionOperator>,
+ Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(BasePythonExecutionOperator.class);
+
+ /* a rolling counter for all requests in the current window */
+ protected transient long requestCounterForThisWindow = 0;
+
+ protected transient long responseCounterForThisWindow = 0;
+
+ /***
+ * Blocks the operator thread in endWindow until all of the tuples for the window are successfully emitted. Note
+ * that these might be emitted from the stragglers port as well.
+ */
+ protected boolean blockAtEndOfWindowForStragglers = false;
+
+ protected transient long currentWindowId = 0;
+
+ private long numberOfRequestsProcessedPerCheckpoint = 0;
+
+ /*** Configuration used to decide if a new operator needs to be spawned while dynamic partitioning planning is taking
+ place. This represents the threshold for percent number of times the requests were starved when no threads were
+ available to process a request. See {@link ThreadStarvationBasedPartitioner}
+ */
+ private float starvationPercentBeforeSpawningNewInstance = 30;
+
+ private transient ApexPythonEngine apexPythonEngine;
+
+ /*** number of workers in the worker pool */
+ private int workerThreadPoolSize = 3;
+
+ /** Can be used to set the additional File system paths that the interpreter can use to locate modules. Use
+ * commas as separator when representing a collection of strings */
+ private String interpreterEngineIncludePaths;
+
+ /** Can be used to add shared modules that an interpreter engine can load if working within JVM memory space. Use
+ * commas as separator when representing a collection of strings */
+ private String interpreterEngineSharedLibNames;
+
+ /** Used to represent the behaviour of the interpreter thread if no requests are found in its queue. Used to tune
+ * for extremely sensitive SLAs for computation. See {@link SpinPolicy} for possible string values representation */
+ private String idleInterpreterSpinPolicy = SpinPolicy.SLEEP.name();
+
+ /** Used to represent the behaviour of the disruptor queue that processes request response dispatches. Tune this
+ * to manage the low latency behaviors. See {@link SpinPolicy} for possible string value representation */
+ private String requestQueueWaitSpinPolicy = SpinPolicy.SLEEP.name();
+
+ private int sleepTimeInMSInCaseOfNoRequests = 5;
+
+
+ /*** Time for which the python engine will sleep to allow for the interpreter will sleep before worker can be used
+ * for executing work requests.
+ */
+ private long sleepTimeDuringInterpreterBoot = 2000L;
+
+ private PythonExecutionPartitionerType partitionerType = PythonExecutionPartitionerType.THREAD_STARVATION_BASED;
+
+ private transient AbstractPythonExecutionPartitioner partitioner;
+
+ /*** port into which the stragglers will be emitted */
+ public final transient DefaultOutputPort<PythonRequestResponse> stragglersPort =
+ new com.datatorrent.api.DefaultOutputPort<>();
+
+ /*** Port into which the normal execution path will push the results into */
+ public final transient DefaultOutputPort<PythonRequestResponse> outputPort =
+ new com.datatorrent.api.DefaultOutputPort<>();
+
+ /*** Port into which all error tuples will be emitted into */
+ public final transient DefaultOutputPort<T> errorPort = new com.datatorrent.api.DefaultOutputPort<>();
+
+ private Object objectForLocking = new Object();
+
+ /*** A counter that is used to track how many times a request could not be serviced within a given window. Used
+ by the {@link ThreadStarvationBasedPartitioner} to spawn a new instance of the operator based on a configuration
+ threshold
+ */
+ @AutoMetric
+ private long numStarvedReturnsPerCheckpoint = 0;
+
+ @AutoMetric
+ private long numNullResponsesPerWindow = 0;
+
+ /***
+ * Represents all python commands that need to be run on a new instance of the operator after dynamic partitioning
+ */
+ private List<PythonRequestResponse> accumulatedCommandHistory = new ArrayList<>();
+
+ /***
+ * Processes the incoming tuple using the python engine that is injected. Also emits stragglers if any.
+ */
+ @InputPortFieldAnnotation
+ public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ numberOfRequestsProcessedPerCheckpoint += 1;
+ requestCounterForThisWindow += 1;
+ emitStragglers();
+ try {
+ PythonRequestResponse result = processPythonCodeForIncomingTuple(tuple,getApexPythonEngine());
+ if ( result != null) {
+ responseCounterForThisWindow += 1;
+ outputPort.emit(result);
+ } else {
+ numNullResponsesPerWindow += 1;
+ }
+ } catch (ApexPythonInterpreterException e) {
+ responseCounterForThisWindow += 1; // Error tuples need to be accounted for totals
+ errorPort.emit(tuple);
+ LOG.debug("Error while processing tuple", e);
+ }
+ }
+ };
+
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ getApexPythonEngine().setNumStarvedReturns(0L);
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ /***
+ * Used to emit the stragglers into the Stragglers port. Invoked either when a new tuple arrives or when Idle time
+ * is detected on this operator.
+ */
+ private void emitStragglers()
+ {
+ LOG.debug("Emitting stragglers");
+ List<PythonRequestResponse> stragglerResponse = new ArrayList<>();
+ getApexPythonEngine().getDelayedResponseQueue().drainTo(stragglerResponse);
+ for (PythonRequestResponse aReqResponse : stragglerResponse) {
+ responseCounterForThisWindow += 1;
+ stragglersPort.emit(aReqResponse);
+ }
+ }
+
+ /***
+ * Instantiates the configured python engine. Only in-memory implementation provided for now
+ * @param context The operator context
+ * @return The python engine
+ */
+ protected ApexPythonEngine initApexPythonEngineImpl(Context.OperatorContext context)
+ {
+ JepPythonEngine jepPythonEngine = new JepPythonEngine("" + context.getId(),workerThreadPoolSize);
+ jepPythonEngine.setSleepTimeAfterInterpreterStart(getSleepTimeDuringInterpreterBoot());
+ LOG.debug("Initialized Python engine " + jepPythonEngine);
+ return jepPythonEngine;
+ }
+
+ /***
+ * Instantiates the partitioner. Only Thread Starvation based partitioner for now.
+ */
+ private void initPartitioner()
+ {
+ if (partitioner == null) {
+ synchronized (objectForLocking) {
+ if (partitioner == null) {
+ switch (partitionerType) {
+ default:
+ case THREAD_STARVATION_BASED:
+ partitioner = new ThreadStarvationBasedPartitioner(this);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /***
+ * Starts the python engine and also sleeps for sometime ( configurable) to ensure that the interpreter is
+ * completely booted up in memory. The time taken to boot the interpreter depends on the libraries that are loaded etc
+ * and hence tune the sleeptimeToBoot parameter accordingly.
+ * @param context The Operator context
+ */
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ LOG.debug("Initializing the python interpreter setup");
+ apexPythonEngine = initApexPythonEngineImpl(context);
+ Map<PythonInterpreterConfig,Object> preInitConfigurations = getPreInitConfigurations();
+ apexPythonEngine.preInitInterpreter(preInitConfigurations);
+ apexPythonEngine.startInterpreter();
+ LOG.debug("Python interpreter started. Now invoking post interpreter logic");
+ processPostSetUpPythonInstructions(apexPythonEngine);
+ LOG.debug("Python post interpreter logic complete");
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ apexPythonEngine.stopInterpreter();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ requestCounterForThisWindow = 0;
+ responseCounterForThisWindow = 0;
+ numNullResponsesPerWindow = 0;
+ currentWindowId = windowId;
+ }
+
+ /***
+ * if blockAtEndOfWindowForStragglers is configured to be true ( via a setter or config ), then end window loops
+ * until all the stragglers are emitted. If this configuration element is set to false, then there is no loop
+ * waiting for the stragglers to be completed emitting. Note that there is a sleep in case the configuration
+ * blockAtEndOfWindowForStragglers value is set to true between each bulk emit of all stragglers that arrived at that
+ * point in time.
+ */
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (responseCounterForThisWindow < requestCounterForThisWindow) {
+ LOG.debug("Detected stragglers and configured flag/state for blocking at end of window is " +
+ blockAtEndOfWindowForStragglers);
+ if (blockAtEndOfWindowForStragglers) {
+ LOG.debug("Trying to emit all stragglers before the next window can be processed");
+ while (responseCounterForThisWindow < requestCounterForThisWindow) {
+ emitStragglers();
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread interrupted while trying to emit all stragglers");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+ Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+ {
+ initPartitioner();
+ return partitioner.definePartitions(partitions,context);
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+ {
+ initPartitioner();
+ partitioner.partitioned(partitions);
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ accumulatedCommandHistory.clear();
+ accumulatedCommandHistory.addAll(getApexPythonEngine().getCommandHistory());
+ numStarvedReturnsPerCheckpoint = getApexPythonEngine().getNumStarvedReturns();
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ getApexPythonEngine().setNumStarvedReturns(0L);
+ numberOfRequestsProcessedPerCheckpoint = 0;
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ }
+
+ /***
+ * Override this to hook any logic that would be needed immediately after setup of the interpreter. Some of the
+ * actions include setting up an interpreter state which can be used across all tuple invocations
+ * @param pythonEngine
+ * @throws ApexPythonInterpreterException
+ */
+ public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngine) throws ApexPythonInterpreterException
+ {
+ apexPythonEngine.postStartInterpreter();
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ emitStragglers();
+ }
+
+ public ApexPythonEngine getApexPythonEngine()
+ {
+ return apexPythonEngine;
+ }
+
+ public void setApexPythonEngine(ApexPythonEngine apexPythonEngine)
+ {
+ this.apexPythonEngine = apexPythonEngine;
+ }
+
+ /***
+ * Override this method to perform any meaningful work in python interpreter space.
+ * @param input The incoming tuple
+ * @param pythonEngineRef The impementation of the Python interpreter that can be used to execute python commands
+ * @return The result of execution wrapped as a PythonRequestResponse Object
+ * @throws ApexPythonInterpreterException if interrupted or no workers avaialble to execute the python code.
+ */
+ public PythonRequestResponse processPythonCodeForIncomingTuple(T input, ApexPythonEngine pythonEngineRef)
+ throws ApexPythonInterpreterException
+ {
+ return null;
+ }
+
+ /***
+ * See constants defined in {@link PythonInterpreterConfig} for a list of keys available. Application properties xml
+ * file can be used to set the values for the interpreter configuration. Override this method
+ * to refine the configuration that is being used to start the interpreter instance. Please see test
+ * application implemented in the test code for example usage.
+ * @return
+ */
+ public Map<PythonInterpreterConfig,Object> getPreInitConfigurations()
+ {
+ Map<PythonInterpreterConfig,Object> configsForInterpreter = new HashMap<>();
+ String includePaths = getInterpreterEngineIncludePaths();
+ if (includePaths != null) {
+ List<String> interpreterIncludePaths = new ArrayList<>();
+ interpreterIncludePaths.addAll(Arrays.asList(includePaths.split(",")));
+ configsForInterpreter.put(PythonInterpreterConfig.PYTHON_INCLUDE_PATHS, interpreterIncludePaths);
+ }
+ String sharedLibs = getInterpreterEngineSharedLibNames();
+ if (sharedLibs != null) {
+ List<String> sharedLibNames = new ArrayList<>();
+ sharedLibNames.addAll(Arrays.asList(sharedLibs.split(",")));
+ configsForInterpreter.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibNames);
+ }
+ configsForInterpreter.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, getIdleInterpreterSpinPolicy());
+ configsForInterpreter.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY, getRequestQueueWaitSpinPolicy());
+ configsForInterpreter.put(PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS,
+ getSleepTimeInMSInCaseOfNoRequests());
+ return configsForInterpreter;
+ }
+
+ public long getSleepTimeDuringInterpreterBoot()
+ {
+ return sleepTimeDuringInterpreterBoot;
+ }
+
+ public void setSleepTimeDuringInterpreterBoot(long sleepTimeDuringInterpreterBoot)
+ {
+ this.sleepTimeDuringInterpreterBoot = sleepTimeDuringInterpreterBoot;
+ }
+
+ public int getWorkerThreadPoolSize()
+ {
+ return workerThreadPoolSize;
+ }
+
+ public void setWorkerThreadPoolSize(int workerThreadPoolSize)
+ {
+ this.workerThreadPoolSize = workerThreadPoolSize;
+ }
+
+ public PythonExecutionPartitionerType getPartitionerType()
+ {
+ return partitionerType;
+ }
+
+ public void setPartitionerType(PythonExecutionPartitionerType partitionerType)
+ {
+ this.partitionerType = partitionerType;
+ }
+
+ public long getNumberOfRequestsProcessedPerCheckpoint()
+ {
+ return numberOfRequestsProcessedPerCheckpoint;
+ }
+
+ public void setNumberOfRequestsProcessedPerCheckpoint(long numberOfRequestsProcessedPerCheckpoint)
+ {
+ this.numberOfRequestsProcessedPerCheckpoint = numberOfRequestsProcessedPerCheckpoint;
+ }
+
+ public AbstractPythonExecutionPartitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
+ public void initPartitioner(AbstractPythonExecutionPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ public float getStarvationPercentBeforeSpawningNewInstance()
+ {
+ return starvationPercentBeforeSpawningNewInstance;
+ }
+
+ public void setStarvationPercentBeforeSpawningNewInstance(float starvationPercentBeforeSpawningNewInstance)
+ {
+ this.starvationPercentBeforeSpawningNewInstance = starvationPercentBeforeSpawningNewInstance;
+ }
+
+ public long getNumStarvedReturns()
+ {
+ return numStarvedReturnsPerCheckpoint;
+ }
+
+ public void setNumStarvedReturns(long numStarvedReturns)
+ {
+ this.numStarvedReturnsPerCheckpoint = numStarvedReturns;
+ }
+
+ public List<PythonRequestResponse> getAccumulatedCommandHistory()
+ {
+ return accumulatedCommandHistory;
+ }
+
+ public void setAccumulatedCommandHistory(List<PythonRequestResponse> accumulatedCommandHistory)
+ {
+ this.accumulatedCommandHistory = accumulatedCommandHistory;
+ }
+
+ public boolean isBlockAtEndOfWindowForStragglers()
+ {
+ return blockAtEndOfWindowForStragglers;
+ }
+
+ public void setBlockAtEndOfWindowForStragglers(boolean blockAtEndOfWindowForStragglers)
+ {
+ this.blockAtEndOfWindowForStragglers = blockAtEndOfWindowForStragglers;
+ }
+
+ public String getInterpreterEngineIncludePaths()
+ {
+ return interpreterEngineIncludePaths;
+ }
+
+ public void setInterpreterEngineIncludePaths(String interpreterEngineIncludePaths)
+ {
+ this.interpreterEngineIncludePaths = interpreterEngineIncludePaths;
+ }
+
+ public String getInterpreterEngineSharedLibNames()
+ {
+ return interpreterEngineSharedLibNames;
+ }
+
+ public void setInterpreterEngineSharedLibNames(String interpreterEngineSharedLibNames)
+ {
+ this.interpreterEngineSharedLibNames = interpreterEngineSharedLibNames;
+ }
+
+ public String getIdleInterpreterSpinPolicy()
+ {
+ return idleInterpreterSpinPolicy;
+ }
+
+ public void setIdleInterpreterSpinPolicy(String idleInterpreterSpinPolicy)
+ {
+ this.idleInterpreterSpinPolicy = idleInterpreterSpinPolicy;
+ }
+
+ public String getRequestQueueWaitSpinPolicy()
+ {
+ return requestQueueWaitSpinPolicy;
+ }
+
+ public void setRequestQueueWaitSpinPolicy(String requestQueueWaitSpinPolicy)
+ {
+ this.requestQueueWaitSpinPolicy = requestQueueWaitSpinPolicy;
+ }
+
+ public int getSleepTimeInMSInCaseOfNoRequests()
+ {
+ return sleepTimeInMSInCaseOfNoRequests;
+ }
+
+ public void setSleepTimeInMSInCaseOfNoRequests(int sleepTimeInMSInCaseOfNoRequests)
+ {
+ this.sleepTimeInMSInCaseOfNoRequests = sleepTimeInMSInCaseOfNoRequests;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
new file mode 100644
index 0000000..f904d98
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+
+/***
+ * Used as key configs while passing the pre interpreter configuration. See
+ * {@link JepPythonEngine#preInitInterpreter(Map)}
+ */
+public enum PythonInterpreterConfig
+{
+ PYTHON_INCLUDE_PATHS,
+ PYTHON_SHARED_LIBS,
+ IDLE_INTERPRETER_SPIN_POLICY,
+ REQUEST_QUEUE_WAIT_SPIN_POLICY,
+ SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
new file mode 100644
index 0000000..e024ab0
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+
+/***
+ * <p>Used to specify if a given API invocation in the in-memory interpreter is going to be invoked for all the worker
+ * threads, a sticky thread or just any one thread.
+ *
+ * - {@link WorkerExecutionMode#BROADCAST} is to be used when the
+ * command is resulting in a state of the interpreter which has to be used in subsequent calls. For example,
+ * deserializing a machine learning model can be used as a BROADCAST model as the scoring can then be invoked across
+ * all worker threads if required.
+ * - {@link WorkerExecutionMode#ANY} Represents a state wherein any worker can be used to execute the code.
+ * Example would be scoring an incoming tuple on which the model has already been deserialized across all nodes
+ * - {@link WorkerExecutionMode#STICKY} Use STICKY if the same worker needs to service the request.
+ * The downside of this is that it may or may not complete on time and depends on the queue length.</p>
+ *
+ * <p><b>Ensure the {@link PythonInterpreterRequest#hashCode()} is overridden if STICKY is chosen </b></p>
+ */
+public enum WorkerExecutionMode
+{
+ BROADCAST,
+ ANY,
+ STICKY
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
new file mode 100644
index 0000000..75fa552
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import jep.Jep;
+import jep.JepConfig;
+import jep.JepException;
+import jep.NDArray;
+
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_INCLUDE_PATHS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_SHARED_LIBS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+/**
+ * <p>
+ * Represents a python interpreter instance embedded in JVM memory using the JEP ( Java embedded Python ) engine.
+ * JEP uses JNI wrapper around the embedded Python instance. JEP mandates that the thread that created the JEP
+ * instance is the only thread that can perform method calls on the embedded interpreter. This requires
+ * the Apex operator implementation to decouple JEP execution logic from the operator processing main thread.
+ * <b>Note that this embedded python is an interpreter and this essentially means the state of the interpreter
+ * is maintained across all calls to the interpreter.</b>
+ * The threaded implementation provides the following main functionalities
+ * <ol>
+ * <li>An evaluation expression that can interpret a string as a python command. The user can also set
+ * variable values that are
+ * <ul>
+ * <li>Transferred to the interpreter with the same variable names</li>
+ * <li>Garbage collected from the python interpreter space</li>
+ * </ul>
+ * </li>
+ * <li>
+ * A method call invocation wherein parameters can be sent to the previously defined method (the method must have to
+ * be defined perhaps via an eval expression or a previous execute script call)
+ * </li>
+ * <li>A script call command that can execute a script. There is currently no support to pass params to scripts</li>
+ * <li>A handy mechanism to execute a series of commands. Note that this is a simple wrapper around the
+ * eval expression. The main difference here is that there are no user variables substitution used in
+ * this model. This is useful for statements like import ( ex: import numpy as np ) which does not require
+ * user variables conversion</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * The logic is executed using a request and response queue model. The thread keeps consuming from a request queue
+ * and submits results to a response queue.
+ * </p>
+ * <p>
+ * Note that all outputs are being redirected to the standard logger. Hence using statements like print(secret)
+ * needs to be avoided as the result of the print command is captured in the log.
+ * </p>
+ * <p>
+ * When using Cpython libraries like numpy, <b>ensure you first register numpy as a shared library</b> before using it
+ * in even import statements. Not doing so will result in very obscure errors.
+ * </p>
+ */
+
+public class InterpreterThread implements Runnable
+{
+ private static final Logger LOG = LoggerFactory.getLogger(InterpreterThread.class);
+
+ /* Name of the dynamically loaded JNI library */
+ public static final String JEP_LIBRARY_NAME = "jep";
+
+ /* The string command which will be used to delete python variables after they are used. */
+ public static final String PYTHON_DEL_COMMAND = "del ";
+
+ public transient Jep JEP_INSTANCE;
+
+ /* Used by the operator thread or other threads to mark the stopping of processing of the interpreter command loop */
+ private transient volatile boolean isStopped = false;
+
+ /* Used to represent the current state of this thread whether it is currently busy executing a command */
+ private transient volatile boolean busyFlag = false;
+
+ /* Represents the default amount of time that this thread will wait to read a command from the request queue */
+ private long timeOutToPollFromRequestQueue = 1;
+
+ private TimeUnit timeUnitsToPollFromRequestQueue = TimeUnit.MILLISECONDS;
+
+ private transient volatile BlockingQueue<PythonRequestResponse> requestQueue;
+
+ private transient volatile BlockingQueue<PythonRequestResponse> responseQueue;
+
+ /* An id that can be useful while logging statements */
+ private String threadID;
+
+ /* Whether this thread should sleep for a few moments if there are no requests are keep checking the request queue */
+ private SpinPolicy spinPolicy = SpinPolicy.SLEEP;
+
+ /* Holds the configs that are used to initialize the interpreter thread. Examples of config are shared libraries and
+ include paths for the interpreter. The key is one of the constants defined in PythonInterpreterConfig and value
+ is specific to the config type that is being set.
+ */
+ private Map<PythonInterpreterConfig,Object> initConfigs = new HashMap<>();
+
+ /* Used as a flag to denote an error situation in the interpreter so that the next set of commands to run
+ * an empty/null eval expression to clear any erraneous state */
+ private boolean errorEncountered = false;
+
+ private long sleepTimeMsInCaseOfNoRequests = 1;
+
+ /***
+ * Constructs an interpreter thread instance. Note that the constructor does not start the interpreter in memory yet.
+ * @param requestQueue The queue from which requests will be processed from.
+ * @param responseQueue The queue into which the responses will be written into
+ * @param threadID An identifier for this thread name for efficient logging markers
+ */
+ public InterpreterThread(BlockingQueue<PythonRequestResponse> requestQueue,
+ BlockingQueue<PythonRequestResponse> responseQueue,String threadID)
+ {
+ this.requestQueue = requestQueue;
+ this.responseQueue = responseQueue;
+ this.threadID = threadID;
+ }
+
+ /***
+ * Loads the JEP dynamic library for the JVM to use the JNI bridge into the interpreter
+ * @throws ApexPythonInterpreterException if the library could not be loaded or located
+ */
+ private void loadMandatoryJVMLibraries() throws ApexPythonInterpreterException
+ {
+ LOG.info("Java library path being used for Interpreted ID " + threadID + " " +
+ System.getProperty("java.library.path"));
+ try {
+ System.loadLibrary(JEP_LIBRARY_NAME);
+ } catch (Exception e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ LOG.info("JEP library loaded successfully");
+ }
+
+
+ public Jep getEngineReference() throws ApexPythonInterpreterException
+ {
+ return JEP_INSTANCE;
+ }
+
+ /***
+ * Executes the logic required before the start of the interpreter. In this case, it is just registering of the
+ * configs which are to be used when the interpreter is about to load
+ * @param preInitConfigs
+ * @throws ApexPythonInterpreterException
+ */
+ public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+ throws ApexPythonInterpreterException
+ {
+ initConfigs.putAll(preInitConfigs);
+ }
+
+ /***
+ * Starts the interpreter by loading the shared libraries
+ * @throws ApexPythonInterpreterException if the interpreter could not be started
+ */
+ public void startInterpreter() throws ApexPythonInterpreterException
+ {
+ Thread.currentThread().setName(threadID);
+ Thread.currentThread().setPriority(Thread.MAX_PRIORITY); // To allow for time aware calls
+ loadMandatoryJVMLibraries();
+ JepConfig config = new JepConfig()
+ .setRedirectOutputStreams(true)
+ .setInteractive(false)
+ .setClassLoader(Thread.currentThread().getContextClassLoader()
+ );
+ if (initConfigs.containsKey(PYTHON_INCLUDE_PATHS)) {
+ List<String> includePaths = (List<String>)initConfigs.get(PYTHON_INCLUDE_PATHS);
+ if ( includePaths != null) {
+ LOG.info("Adding include path for the in-memory interpreter instance");
+ for (String anIncludePath: includePaths) {
+ config.addIncludePaths(anIncludePath);
+ }
+ }
+ }
+ if (initConfigs.containsKey(PYTHON_SHARED_LIBS)) {
+ Set<String> sharedLibs = (Set<String>)initConfigs.get(PYTHON_SHARED_LIBS);
+ if ( sharedLibs != null) {
+ config.setSharedModules(sharedLibs);
+ LOG.info("Loaded " + sharedLibs.size() + " shared libraries as config");
+ }
+ } else {
+ LOG.info(" No shared libraries loaded");
+ }
+ if (initConfigs.containsKey(IDLE_INTERPRETER_SPIN_POLICY)) {
+ spinPolicy = SpinPolicy.valueOf((String)initConfigs.get(IDLE_INTERPRETER_SPIN_POLICY));
+ LOG.debug("Configuring spin policy to be " + spinPolicy);
+ }
+ if (initConfigs.containsKey(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS)) {
+ sleepTimeMsInCaseOfNoRequests = (Long)initConfigs.get(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS);
+ LOG.debug("Configuring sleep time for no requests situation to be " + sleepTimeMsInCaseOfNoRequests);
+ }
+ try {
+ LOG.info("Launching the in-memory interpreter");
+ JEP_INSTANCE = new Jep(config);
+ } catch (JepException e) {
+ LOG.error(e.getMessage(),e); // Purposefully logging as this will help in startup issues being captured inline
+ throw new ApexPythonInterpreterException(e);
+ }
+ }
+
+ /***
+ * Runs a series of interpreter commands. Note that no params can be passed from the JVM to the python interpreter
+ * space
+ * @param commands The series of commands that will be executed sequentially
+ * @return A map containing the result of execution of each of the commands. The command is the key that was
+ * passed as input and the value is a boolean whether the command was executed successfully
+ */
+ private Map<String,Boolean> runCommands(List<String> commands)
+ {
+ LOG.debug("Executing run commands");
+ Map<String,Boolean> resultsOfExecution = new HashMap<>();
+ for (String aCommand : commands) {
+ LOG.debug("Executing command " + aCommand);
+ try {
+ resultsOfExecution.put(aCommand,JEP_INSTANCE.eval(aCommand));
+ } catch (JepException e) {
+ resultsOfExecution.put(aCommand,Boolean.FALSE);
+ errorEncountered = true;
+ LOG.error("Error while running command " + aCommand, e);
+ return resultsOfExecution;
+ }
+ }
+ return resultsOfExecution;
+ }
+
+ /***
+ * Executes a method call by passing any parameters to the method call. The params are passed in the order they are
+ * set in the list.
+ * @param nameOfGlobalMethod Name of the method to invoke
+ * @param argsToGlobalMethod Arguments to the method call. Typecasting is interpreted at runtime and hence multiple
+ * types can be sent as part of the parameter list
+ * @param type The class of the return parameter. Note that in some cases the return type will be the highest possible
+ * bit size. For example addition of tow ints passed in might return a Long by the interpreter.
+ * @param <T> Represents the type of the return parameter
+ * @return The response from the method call that the python method returned
+ */
+ private <T> T executeMethodCall(String nameOfGlobalMethod, List<Object> argsToGlobalMethod, Class<T> type)
+ {
+ LOG.debug("Executing method call invocation");
+ try {
+ if ((argsToGlobalMethod != null) && (argsToGlobalMethod.size() > 0)) {
+ List<Object> paramsToPass = argsToGlobalMethod;
+ List<Object> modifiedParams = new ArrayList<>();
+ for ( Object aMethodParam: argsToGlobalMethod) {
+ if (argsToGlobalMethod.get(0) instanceof NDimensionalArray) {
+ LOG.debug(aMethodParam + " is of type NDimensional array and hence converting to JEP NDArray");
+ modifiedParams.add(((NDimensionalArray)aMethodParam).toNDArray());
+ } else {
+ modifiedParams.add(aMethodParam);
+ }
+ }
+ LOG.debug("Executing method" + nameOfGlobalMethod + " with " + modifiedParams.size() + " parameters");
+ return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,modifiedParams.toArray()));
+ } else {
+ LOG.debug("Executing " + argsToGlobalMethod + " with no parameters");
+ return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,new ArrayList<>().toArray()));
+ }
+ } catch (JepException e) {
+ errorEncountered = true;
+ LOG.error("Error while executing method " + nameOfGlobalMethod, e);
+ }
+ return null;
+ }
+
+ /***
+ * Executes a python script which can be located in the path
+ * @param pathToScript The path to the script
+ * @return true if the script invocation was successfull or false otherwise
+ */
+ private boolean executeScript(String pathToScript)
+ {
+ LOG.debug("Executing script at path " + pathToScript);
+ try {
+ JEP_INSTANCE.runScript(pathToScript);
+ return true;
+ } catch (JepException e) {
+ errorEncountered = true;
+ LOG.error(" Error while executing script " + pathToScript, e);
+ }
+ return false;
+ }
+
+ /***
+ * Evaluates a string expression by passing in any variable subsitution into the Interpreter space if required. Also
+ * handles the garbage collection of the variables passed and offers a configurable way to delete any variable created
+ * as part of the evaluation expression.
+ * @param command The string equivalent of the command
+ * @param variableToExtract The name of the variable that would need to be extracted from the python interpreter space
+ * to the JVM space.
+ * @param variableSubstituionParams Key value pairs representing the variables that need to be passed into the
+ * interpreter space and are part of the eval expression.
+ * @param deleteExtractedVariable if the L.H.S. of an assignment expression variable needs to be deleted. This is
+ * essentially the variable that is being requested to extract i.e. the second
+ * parameter to this method.
+ * @param expectedReturnType Class representing the expected return type
+ * @param <T> Template signature for the expected return type
+ * @return The value that is extracted from the interpreter space ( possibly created as part of the eval expression or
+ * otherwise ). Returns null if any error
+ */
+ private <T> T eval(String command, String variableToExtract, Map<String, Object> variableSubstituionParams,
+ boolean deleteExtractedVariable,Class<T> expectedReturnType)
+ {
+ T variableToReturn = null;
+ LOG.debug("Executing eval expression " + command + " with return type : " + expectedReturnType);
+ try {
+ for (String aKey : variableSubstituionParams.keySet()) {
+ Object keyVal = variableSubstituionParams.get(aKey);
+ if (keyVal instanceof NDimensionalArray) {
+ keyVal = ((NDimensionalArray)keyVal).toNDArray();
+ }
+ JEP_INSTANCE.set(aKey, keyVal);
+ }
+ } catch (JepException e) {
+ errorEncountered = true;
+ LOG.error("Error while setting the params for eval expression " + command, e);
+ return null;
+ }
+ try {
+ LOG.debug("Executing the eval expression in the interpreter instance " + command);
+ JEP_INSTANCE.eval(command);
+ } catch (JepException e) {
+ errorEncountered = true;
+ LOG.error("Error while evaluating the expression " + command, e);
+ return null;
+ }
+ try {
+ if (variableToExtract != null) {
+ Object extractedVariable = JEP_INSTANCE.getValue(variableToExtract);
+ if (extractedVariable instanceof NDArray) {
+ LOG.debug(" Return type is a NumPy Array. Hence converting to NDimensionalArray instance");
+ NDArray ndArrayJepVal = (NDArray)extractedVariable;
+ NDimensionalArray nDimArray = new NDimensionalArray();
+ nDimArray.setData(ndArrayJepVal.getData());
+ nDimArray.setSignedFlag(ndArrayJepVal.isUnsigned());
+ int[] dimensions = ndArrayJepVal.getDimensions();
+ nDimArray.setDimensions(dimensions);
+ int lengthInOneDimension = 1;
+ for ( int i = 0; i < dimensions.length; i++) {
+ lengthInOneDimension *= dimensions[i];
+ }
+ nDimArray.setLengthOfSequentialArray(lengthInOneDimension);
+ variableToReturn = expectedReturnType.cast(nDimArray);
+ } else {
+ variableToReturn = expectedReturnType.cast(extractedVariable);
+ }
+ if (deleteExtractedVariable) {
+ LOG.debug("Deleting the extracted variable from the Python interpreter space");
+ JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + variableToExtract);
+ }
+ }
+ LOG.debug("Deleting all the variables from the python interpreter space ");
+ for (String aKey: variableSubstituionParams.keySet()) {
+ LOG.debug("Deleting " + aKey);
+ JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + aKey);
+ }
+ } catch (JepException e) {
+ errorEncountered = true;
+ LOG.error("Error while evaluating delete part of expression " + command, e);
+ return null;
+ }
+ return variableToReturn;
+ }
+
+ /***
+ * Stops the interpreter as requested from the operator/main thread
+ * @throws ApexPythonInterpreterException if not able to stop the
+ */
+ public void stopInterpreter() throws ApexPythonInterpreterException
+ {
+ isStopped = true;
+ LOG.info("Attempting to close the interpreter thread");
+ try {
+ JEP_INSTANCE.close();
+ } catch (Exception e) {
+ LOG.error("Error while stopping the interpreter thread ", e);
+ throw new ApexPythonInterpreterException(e);
+ }
+ LOG.info("Interpreter closed");
+ }
+
+ /***
+ * Responsible for polling the request queue and formatting the request payload to make it compatible to the
+ * individual processing logic of the functionalities provided by the interpreter API methods.
+ * @param <T> Java templating signature enforcement
+ * @throws ApexPythonInterpreterException if an unrecognized command is issued.
+ * @throws InterruptedException if interrupted while trying to wait for a request from request queue
+ */
+ private <T> void processCommand() throws ApexPythonInterpreterException, InterruptedException
+ {
+
+ PythonRequestResponse requestResponseHandle = requestQueue.poll(timeOutToPollFromRequestQueue,
+ timeUnitsToPollFromRequestQueue);
+ if (requestResponseHandle != null) {
+ LOG.debug("Processing command " + requestResponseHandle.getPythonInterpreterRequest().getCommandType());
+ busyFlag = true;
+ if (errorEncountered) {
+ LOG.debug("Error state detected from a previous command. Resetting state to the previous" +
+ " state of the error");
+ try {
+ JEP_INSTANCE.eval(null);
+ errorEncountered = false;
+ } catch (JepException e) {
+ LOG.error("Error while trying to clear the state of the interpreter due to previous command" +
+ " " + e.getMessage(), e);
+ }
+ }
+ PythonInterpreterRequest<T> request =
+ requestResponseHandle.getPythonInterpreterRequest();
+ PythonInterpreterResponse<T> response =
+ requestResponseHandle.getPythonInterpreterResponse();
+ Map<String,Boolean> commandStatus = new HashMap<>(1);
+ switch (request.getCommandType()) {
+ case EVAL_COMMAND:
+ EvalCommandRequestPayload evalPayload = request.getEvalCommandRequestPayload();
+ T responseVal = eval(evalPayload.getEvalCommand(), evalPayload.getVariableNameToExtractInEvalCall(),
+ evalPayload.getParamsForEvalCommand(), evalPayload.isDeleteVariableAfterEvalCall(),
+ request.getExpectedReturnType());
+ response.setResponse(responseVal);
+ if (responseVal != null) {
+ commandStatus.put(evalPayload.getEvalCommand(),Boolean.TRUE);
+ } else {
+ commandStatus.put(evalPayload.getEvalCommand(),Boolean.FALSE);
+ }
+ response.setCommandStatus(commandStatus);
+ break;
+ case SCRIPT_COMMAND:
+ ScriptExecutionRequestPayload scriptPayload = request.getScriptExecutionRequestPayload();
+ if (executeScript(scriptPayload.getScriptName())) {
+ commandStatus.put(scriptPayload.getScriptName(),Boolean.TRUE);
+ } else {
+ commandStatus.put(scriptPayload.getScriptName(),Boolean.FALSE);
+ }
+ response.setCommandStatus(commandStatus);
+ break;
+ case METHOD_INVOCATION_COMMAND:
+ MethodCallRequestPayload requestpayload = request.getMethodCallRequest();
+ response.setResponse(executeMethodCall(
+ requestpayload.getNameOfMethod(), requestpayload.getArgs(), request.getExpectedReturnType()));
+ if (response.getResponse() == null) {
+ commandStatus.put(requestpayload.getNameOfMethod(), Boolean.FALSE);
+ } else {
+ commandStatus.put(requestpayload.getNameOfMethod(), Boolean.TRUE);
+ }
+ response.setCommandStatus(commandStatus);
+ break;
+ case GENERIC_COMMANDS:
+ response.setCommandStatus(runCommands(request.getGenericCommandsRequestPayload().getGenericCommands()));
+ break;
+ default:
+ throw new ApexPythonInterpreterException(new Exception("Unspecified Interpreter command"));
+ }
+ requestResponseHandle.setRequestCompletionTime(System.currentTimeMillis());
+ responseQueue.put(requestResponseHandle);
+ LOG.debug("Submitted the response and executed " + response.getCommandStatus().size() + " instances of command");
+ }
+ busyFlag = false;
+ }
+
+ /***
+ * Starts the interpreter as soon as the thread starts running. This is due to the limitation of JEP which stipulates
+ * that the thread which started the interpreter can only issue subsequent calls/invocations. This is due to JNI
+ * limitations. The thread then tries to consume from the request queue and process them. If there are no requests
+ * present then the thread can possibly go to sleep based on the {@link SpinPolicy} configured. The spin policy
+ * is passed in as the pre init configurations. See {@link PythonInterpreterConfig} for more details
+ */
+ @Override
+ public void run()
+ {
+ LOG.info("Starting the execution of Interpreter thread ");
+ if (JEP_INSTANCE == null) {
+ LOG.info("Initializaing the interpreter state");
+ startInterpreter();
+ LOG.info("Successfully initialized the interpreter");
+ }
+ while (!isStopped) {
+ if ( (requestQueue.isEmpty()) && (spinPolicy == SpinPolicy.SLEEP)) {
+ LOG.debug("Sleeping the current thread as there are no more requests to process from the queue");
+ try {
+ Thread.sleep(sleepTimeMsInCaseOfNoRequests);
+ continue;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ try {
+ processCommand();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOG.info("Stop condition detected for this thread. Stopping the in-memory interpreter now...");
+ stopInterpreter();
+ }
+
+ public Jep getJEP_INSTANCE()
+ {
+ return JEP_INSTANCE;
+ }
+
+ public void setJEP_INSTANCE(Jep JEP_INSTANCE)
+ {
+ this.JEP_INSTANCE = JEP_INSTANCE;
+ }
+
+ public long getTimeOutToPollFromRequestQueue()
+ {
+ return timeOutToPollFromRequestQueue;
+ }
+
+ public void setTimeOutToPollFromRequestQueue(long timeOutToPollFromRequestQueue)
+ {
+ this.timeOutToPollFromRequestQueue = timeOutToPollFromRequestQueue;
+ }
+
+ public TimeUnit getTimeUnitsToPollFromRequestQueue()
+ {
+ return timeUnitsToPollFromRequestQueue;
+ }
+
+ public void setTimeUnitsToPollFromRequestQueue(TimeUnit timeUnitsToPollFromRequestQueue)
+ {
+ this.timeUnitsToPollFromRequestQueue = timeUnitsToPollFromRequestQueue;
+ }
+
+ public boolean isStopped()
+ {
+ return isStopped;
+ }
+
+ public void setStopped(boolean stopped)
+ {
+ isStopped = stopped;
+ }
+
+ public BlockingQueue<PythonRequestResponse> getRequestQueue()
+ {
+ return requestQueue;
+ }
+
+ public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+ {
+ this.requestQueue = requestQueue;
+ }
+
+ public BlockingQueue<PythonRequestResponse> getResponseQueue()
+ {
+ return responseQueue;
+ }
+
+ public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+ {
+ this.responseQueue = responseQueue;
+ }
+
+ public Map<PythonInterpreterConfig, Object> getInitConfigs()
+ {
+ return initConfigs;
+ }
+
+ public void setInitConfigs(Map<PythonInterpreterConfig, Object> initConfigs)
+ {
+ this.initConfigs = initConfigs;
+ }
+
+ public boolean isBusy()
+ {
+ boolean busyState = busyFlag;
+ if (!requestQueue.isEmpty()) { // This is required because interpreter thread goes to a 1 ms sleep to allow other
+ // threads work when checking the queue for request availability. Hence busy state flag need not necessarily
+ // be updated in this sleep window even though if there is a pending request
+ busyState = true;
+ }
+ return busyState;
+ }
+
+ public void setBusy(boolean busy)
+ {
+ busyFlag = busy;
+ }
+
+ public SpinPolicy getSpinPolicy()
+ {
+ return spinPolicy;
+ }
+
+ public void setSpinPolicy(SpinPolicy spinPolicy)
+ {
+ this.spinPolicy = spinPolicy;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
new file mode 100644
index 0000000..d969b19
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+/***
+ * Wraps around the interpreter thread so that time bound SLAs can be implemented for python based execution
+ * This class primarily implements the time constraints by utilizing the {@link InterpreterThread} class and using
+ * a Disruptor blocking queue for high throughput. Utilizes an executor service to implement the timing SLAs.
+ */
+public class InterpreterWrapper
+{
+ private static final Logger LOG = LoggerFactory.getLogger(InterpreterWrapper.class);
+
+ /* Reference to the interpreter thread which executes requests in memory */
+ private transient InterpreterThread interpreterThread;
+
+ /* Spin policy to use for the disruptor implementation */
+ private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+ private int bufferCapacity = 16; // Represents the number of workers and response queue sizes
+
+ private String interpreterId;
+
+ /* Represents the actual thread instance running under the Executor service */
+ private transient Future<?> handleToJepRunner;
+
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private transient BlockingQueue<PythonRequestResponse> requestQueue;
+ private transient BlockingQueue<PythonRequestResponse> responseQueue;
+
+ /* Represents the queue into which all of the stragglers will be pushed into by the interpreter thread */
+ private transient volatile BlockingQueue<PythonRequestResponse> delayedResponsesQueue;
+
+ /***
+ * Constructs the interpreter wrapper instance.
+ * @param interpreterId A string that can be used to represent the interpreter id that is passed onto the actual
+ * thread that is executing the commands
+ *
+ * @param delayedResponsesQueueRef The queue into which all of the straggler responses will end in
+ */
+ public InterpreterWrapper(String interpreterId,BlockingQueue<PythonRequestResponse> delayedResponsesQueueRef,
+ SpinPolicy spinPolicyForWaitingInRequestQueue)
+ {
+ delayedResponsesQueue = delayedResponsesQueueRef;
+ this.interpreterId = interpreterId;
+ this.cpuSpinPolicyForWaitingInBuffer = spinPolicyForWaitingInRequestQueue;
+ requestQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+ responseQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+ interpreterThread = new InterpreterThread(requestQueue,responseQueue,interpreterId);
+ }
+
+ /**
+ * Invokes the interpreter thread pre initialization logic
+ * @param preInitConfigs A set of key value pairs that are used to initialize the actual interpreter. See constants
+ * defined in {@link InterpreterThread} for a list of keys available
+ * @throws ApexPythonInterpreterException if the pre-initialization logic could not be executed for whatever reasons
+ */
+ public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs) throws ApexPythonInterpreterException
+ {
+ interpreterThread.preInitInterpreter(preInitConfigs);
+ }
+
+ /***
+ * Starts the actual interpreter thread to which this class is wrapping around by using an executor service
+ * @throws ApexPythonInterpreterException
+ */
+ public void startInterpreter() throws ApexPythonInterpreterException
+ {
+ handleToJepRunner = executorService.submit(interpreterThread);
+ }
+
+ /***
+ * Builds a response object for the incoming request object.
+ * @param req Represents the incoming request for which response needs to be generated for.
+ * @param windowId The Operator window ID ( used to choose the interpreter thread while choosing to execute the
+ * logic from a pool of worker threads )
+ * @param requestId The request ID perhaps coming from the base python operator. Only used to optimize the right
+ * interpreter lookup from a pool of worker interpreter threads.
+ * @param <T> The template of the return type
+ * @return An object of type {@link PythonRequestResponse} is returned which encompasses both request and response.
+ */
+ private <T> PythonRequestResponse<T> buildRequestRespObject(PythonInterpreterRequest<T> req,
+ long windowId,long requestId)
+ {
+ PythonRequestResponse<T> requestResponse = new PythonRequestResponse();
+ requestResponse.setPythonInterpreterRequest(req);
+ PythonInterpreterResponse<T> response = new PythonInterpreterResponse<>(req.getExpectedReturnType());
+ requestResponse.setPythonInterpreterResponse(response);
+ requestResponse.setRequestStartTime(System.currentTimeMillis());
+ requestResponse.setRequestId(requestId);
+ requestResponse.setWindowId(windowId);
+ return requestResponse;
+ }
+
+ /***
+ * Handles the common logic that is common across all methods of invocation of the in-memory interpreter. Some common
+ * logic includes draining any stragglers, matching the request to the any of the responses that arrive in the
+ * response queue possibly due to previous requests
+ * @param requestResponse The wrapper object into which
+ * @param req The request that contains the timeout SLAs
+ * @param <T> Java templating signature
+ * @return A response to the original incoming request, null if the response did not arrive within given SLA.
+ * @throws InterruptedException if interrupted while waiting for the response queue.
+ */
+ public <T> PythonRequestResponse<T> processRequest(PythonRequestResponse requestResponse,
+ PythonInterpreterRequest<T> req) throws InterruptedException
+ {
+ List<PythonRequestResponse> drainedResults = new ArrayList<>();
+ PythonRequestResponse currentRequestWithResponse = null;
+ boolean isCurrentRequestProcessed = false;
+ long timeOutInNanos = TimeUnit.NANOSECONDS.convert(req.getTimeout(),req.getTimeUnit());
+ // drain any previous responses that were returned while the Apex operator is processing
+ responseQueue.drainTo(drainedResults);
+ LOG.debug("Draining previous request responses if any " + drainedResults.size());
+ for (PythonRequestResponse oldRequestResponse : drainedResults) {
+ delayedResponsesQueue.put(oldRequestResponse);
+ }
+ // We first set a timer to see how long it actually it took for the response to arrive.
+ // It is possible that a response arrived due to a previous request and hence this need for the timer
+ // which tracks the time for the current request.
+ long currentStart = System.nanoTime();
+ long timeLeftToCompleteProcessing = timeOutInNanos;
+ while ( (!isCurrentRequestProcessed) && ( timeLeftToCompleteProcessing > 0 )) {
+ LOG.debug("Submitting the interpreter Request with time out in nanos as " + timeOutInNanos);
+ requestQueue.put(requestResponse);
+ // ensures we are blocked till the time limit
+ currentRequestWithResponse = responseQueue.poll(timeOutInNanos, TimeUnit.NANOSECONDS);
+ timeLeftToCompleteProcessing = timeLeftToCompleteProcessing - ( System.nanoTime() - currentStart );
+ currentStart = System.nanoTime();
+ if (currentRequestWithResponse != null) {
+ if ( (requestResponse.getRequestId() == currentRequestWithResponse.getRequestId()) &&
+ (requestResponse.getWindowId() == currentRequestWithResponse.getWindowId()) ) {
+ isCurrentRequestProcessed = true;
+ break;
+ } else {
+ delayedResponsesQueue.put(currentRequestWithResponse);
+ }
+ } else {
+ LOG.debug(" Processing of request could not be completed on time");
+ }
+ }
+ if (isCurrentRequestProcessed) {
+ LOG.debug("Response could be processed within time SLA");
+ return currentRequestWithResponse;
+ } else {
+ LOG.debug("Response could not be processed within time SLA");
+ return null;
+ }
+ }
+
+ /***
+ * Implements the time based SLA over the interpreters run commands implementation. See
+ * {@link InterpreterThread#runCommands(List)}
+ * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param request The payload of the request
+ * @return A response object with the results of the execution. Null if the request could not be processed on time.
+ * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+ * queue
+ */
+ public PythonRequestResponse<Void> runCommands(long windowId, long requestId,
+ PythonInterpreterRequest<Void> request) throws InterruptedException
+ {
+ request.setCommandType(PythonCommandType.GENERIC_COMMANDS);
+ PythonRequestResponse requestResponse = buildRequestRespObject(request,windowId,requestId);
+ return processRequest(requestResponse,request);
+ }
+
+ /***
+ * Implements the time based SLA over the interpreters run commands implementation. See
+ * {@link InterpreterThread#executeMethodCall(String, List, Class)}
+ * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param request The payload of the request
+ * @return A response object with the results of the execution. Null if the request could not be processed on time.
+ * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+ * queue
+ */
+ public <T> PythonRequestResponse<T> executeMethodCall(long windowId, long requestId,
+ PythonInterpreterRequest<T> request) throws InterruptedException
+ {
+ request.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+ PythonRequestResponse requestResponse = buildRequestRespObject(request, windowId,requestId);
+ return processRequest(requestResponse,request);
+ }
+
+ /***
+ * Implements the time based SLA over the interpreters run commands implementation. See
+ * {@link InterpreterThread#executeScript(String)}
+ * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param request The payload of the request
+ * @return A response object with the results of the execution. Null if the request could not be processed on time.
+ * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+ * queue
+ */
+ public PythonRequestResponse<Void> executeScript(long windowId,long requestId,PythonInterpreterRequest<Void> request)
+ throws InterruptedException
+ {
+ request.setCommandType(PythonCommandType.SCRIPT_COMMAND);
+ PythonRequestResponse<Void> requestResponse = buildRequestRespObject(request, windowId,requestId);
+ return processRequest(requestResponse,request);
+ }
+
+
+ /***
+ * Implements the time based SLA over the interpreters run commands implementation. See
+ * {@link InterpreterThread#eval(String, String, Map, boolean, Class)}
+ * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+ * @param request The payload of the request
+ * @return A response object with the results of the execution. Null if the request could not be processed on time.
+ * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+ * queue
+ */
+ public <T> PythonRequestResponse<T> eval(long windowId, long requestId,PythonInterpreterRequest<T> request)
+ throws InterruptedException
+ {
+ request.setCommandType(PythonCommandType.EVAL_COMMAND);
+ PythonRequestResponse<T> requestResponse = buildRequestRespObject(request,windowId,requestId);
+ return processRequest(requestResponse,request);
+ }
+
+ /***
+ * Stops the interpreter
+ * @throws ApexPythonInterpreterException if error while stopping the interpreter
+ */
+ public void stopInterpreter() throws ApexPythonInterpreterException
+ {
+ interpreterThread.setStopped(true);
+ handleToJepRunner.cancel(false);
+ executorService.shutdown();
+ }
+
+ public InterpreterThread getInterpreterThread()
+ {
+ return interpreterThread;
+ }
+
+ public void setInterpreterThread(InterpreterThread interpreterThread)
+ {
+ this.interpreterThread = interpreterThread;
+ }
+
+ public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+ {
+ return cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+ {
+ this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public int getBufferCapacity()
+ {
+ return bufferCapacity;
+ }
+
+ public void setBufferCapacity(int bufferCapacity)
+ {
+ this.bufferCapacity = bufferCapacity;
+ }
+
+ public String getInterpreterId()
+ {
+ return interpreterId;
+ }
+
+ public void setInterpreterId(String interpreterId)
+ {
+ this.interpreterId = interpreterId;
+ }
+
+ public BlockingQueue<PythonRequestResponse> getRequestQueue()
+ {
+ return requestQueue;
+ }
+
+ public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+ {
+ this.requestQueue = requestQueue;
+ }
+
+ public BlockingQueue<PythonRequestResponse> getResponseQueue()
+ {
+ return responseQueue;
+ }
+
+ public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+ {
+ this.responseQueue = responseQueue;
+ }
+
+ public BlockingQueue<PythonRequestResponse> getDelayedResponsesQueue()
+ {
+ return delayedResponsesQueue;
+ }
+
+ public void setDelayedResponsesQueue(BlockingQueue<PythonRequestResponse> delayedResponsesQueue)
+ {
+ this.delayedResponsesQueue = delayedResponsesQueue;
+ }
+
+ public boolean isCurrentlyBusy()
+ {
+ return interpreterThread.isBusy();
+ }
+
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
new file mode 100644
index 0000000..b575042
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+import com.google.common.primitives.Ints;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/***
+ * <p>Implements the {@link ApexPythonEngine} interface by using the JEP ( Java Embedded Python ) engine. It is an
+ * in-memory interpreter and has the following characteristics:
+ * <ol>
+ * <li>The python engine allows for 4 major API patterns
+ * <ul>
+ * <li>Execute a method call by accepting parameters to pass to the interpreter</li>
+ * <li>Execute a python script as given in a file path</li>
+ * <li>Evaluate an expression and allows for passing of variables between the java code and the python
+ * in-memory interpreter bridge</li>
+ * <li>A handy method wherein a series of instructions can be passed in one single java call ( executed as a
+ * sequence of python eval instructions under the hood ) </li>
+ * </ul>
+ * <li>Automatic garbage collection of the variables that are passed from java code to the in memory python
+ * interpreter</li>
+ * <li>Support for all major python libraries. Tensorflow, Keras, Scikit, XGBoost</li>
+ * <li>The python engine uses the concept of a worker thread that is responsible for executing any of the 4
+ * patterns mentioned above. The worker thread is implemented by {@link InterpreterThread}</li>
+ * <li>The implementation allows for SLA based execution model. i.e. the caller can stipulate that if the call is not
+ * complete within a time out, the engine code returns back null. See {@link InterpreterWrapper}</li>
+ * <li>Supports the concept of stragglers i.e. the processing of a request can complete eventually and the
+ * result available from a queue called as the delayed response queue</li>
+ * <li>Supports the concept of executing a call on all of the worker threads if required. This is to ensure the
+ * following use cases:
+ * <ul>
+ * <li>Since this is an interpreter, the users can make use of an earlier calls variable definition if
+ * need be. In such cases, the caller will have the need for a sticky thread i.e. all such calls need to
+ * end up on the same thread.</li>
+ * <li>Another reason is to implement the concept of Dynamic partitioning. Since interpreter accumulates
+ * state due to commands run on it, if a new partition is introduced at runtime, this can failures for all
+ * subsequent commands as they might depend on variables created in previous windows</li>
+ * </ul>
+ * </li>
+ * </li>
+ * </ol>
+ * </p>
+ *
+ * <p> Note that the engine implementation can be used independent of an Operator i.e. as a utility stack if need be.
+ * Some of the API signatures need a window ID and request ID but they do not necessarily mean that the API
+ * signatures are bound to an operator lifecycle. These parameters are used for efficient thread usage only and
+ * the API only needs a monotonically increasing number in true sense.
+ * </p>
+ *
+ * <p>
+ * JEP needs to be installed on all of the YARN nodes prior to the use of the JEP engine until docker support is
+ * available for Apex. Virtual environments are not supported yet. If multiple versions of python are present
+ * on the YARN nodes, ensure the JVM option java.library.path is pointing to the right version of JEP which in
+ * turn will ensure the right version of python to be used at runtime.
+ * </p>
+ *
+ * <p>
+ * JEPPythonEngine works on the concept of a worker pool. The engine maintains a configurable number of workers and
+ * each worker has a dedicated request and response queue. While this class is responsible for choosing the
+ * right worker from the pool of workers for a given request , the {@link InterpreterWrapper} class is responsible
+ * for maintaining the time bound SLAs.
+ * </p>
+ *
+ */
+public class JepPythonEngine implements ApexPythonEngine
+{
+ private static final Logger LOG = LoggerFactory.getLogger(JepPythonEngine.class);
+
+ /* Size of the worker pool */
+ private int numWorkerThreads = 3;
+
+ /* A name that can be used while logging messages and also used to set thread names */
+ private String threadGroupName;
+
+ private static final String JEP_LIBRARY_NAME = "jep";
+
+ private transient List<PythonRequestResponse> commandHistory = new ArrayList<>();
+
+ /* Spin policy for the disruptor queue implementation */
+ private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+ // Represents the number of responses that can be held in the queue
+ private int bufferCapacity = 64;
+
+ /* Time used to sleep in the beginning of the interpreter threads run i.e. start while initializing the interpreter.
+ Note that booting of the memory interpreter can be a really heavy process depending on the libraries that
+ are being loaded and hence this variable */
+ private long sleepTimeAfterInterpreterStart = 3000; // 3 secs
+
+ /**
+ * Represents the queue into which all the stragglers are drained into
+ */
+ private transient BlockingQueue<PythonRequestResponse> delayedResponseQueue =
+ new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+
+ private List<InterpreterWrapper> workers = new ArrayList<>();
+
+ private Map<PythonInterpreterConfig, Object> preInitConfigs;
+
+ private long numStarvedReturns = 0;
+
+ /***
+ * Created the JEP Python engine instance but does not start the interpreters yet
+ * @param threadGroupName A name that represents all the workers in this thread
+ * @param numWorkerThreads Number of workers in the work pool
+ */
+ public JepPythonEngine(String threadGroupName,int numWorkerThreads)
+ {
+ this.numWorkerThreads = numWorkerThreads;
+ this.threadGroupName = threadGroupName;
+ }
+
+ private void initWorkers() throws ApexPythonInterpreterException
+ {
+ LOG.info("Attempting to load the JEP dynamic library");
+ System.loadLibrary(JEP_LIBRARY_NAME);
+ LOG.info("Successfully loaded the JEP dynamic library in memory");
+ SpinPolicy spinPolicyForReqQueue = SpinPolicy.WAITING;
+ if (preInitConfigs.containsKey(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY)) {
+ spinPolicyForReqQueue = (SpinPolicy)preInitConfigs.get(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY);
+ }
+ for ( int i = 0; i < numWorkerThreads; i++) {
+ InterpreterWrapper aWorker = new InterpreterWrapper(threadGroupName + "-" + i,delayedResponseQueue,
+ spinPolicyForReqQueue);
+ aWorker.preInitInterpreter(preInitConfigs);
+ aWorker.startInterpreter();
+ workers.add(aWorker);
+ }
+ }
+
+ /***
+ * Used to select the right worker from the work pool. The goal is to round robin the workers as far as possible.
+ * Factors like busy workers can mean that the next available worker is chosen
+ * @param requestId Used to round robin the requests. Need not necessarily mean only an operator can use this engine.
+ * @return A worker from the worker pool. Null if all workers are busy.
+ */
+ protected InterpreterWrapper selectWorkerForCurrentCall(long requestId)
+ {
+ int slotToLookFor = Ints.saturatedCast(requestId) % numWorkerThreads;
+ LOG.debug("Slot that is being looked for in the worker pool " + slotToLookFor);
+ boolean isWorkerFound = false;
+ int numWorkersScannedForAvailability = 0;
+ InterpreterWrapper aWorker = null;
+ while ( (!isWorkerFound) && (numWorkersScannedForAvailability < numWorkerThreads)) {
+ aWorker = workers.get(slotToLookFor);
+ numWorkersScannedForAvailability = numWorkersScannedForAvailability + 1;
+ if (!aWorker.isCurrentlyBusy()) {
+ isWorkerFound = true;
+ LOG.debug("Found worker with index as " + slotToLookFor);
+ break;
+ } else {
+ LOG.debug("Thread ID is currently busy " + aWorker.getInterpreterId());
+ slotToLookFor = slotToLookFor + 1;
+ if ( slotToLookFor == numWorkerThreads) {
+ slotToLookFor = 0;
+ }
+ }
+ }
+ if (isWorkerFound) {
+ return aWorker;
+ } else {
+ numStarvedReturns += 1;
+ return null;
+ }
+ }
+
+ /***
+ * See {@link ApexPythonEngine#preInitInterpreter(Map)} for more details
+ * @param preInitConfigs The configuration that is going to be used by the interpreter.See constants
+ * defined in {@link PythonInterpreterConfig} for a list of keys available
+ * @throws ApexPythonInterpreterException if an issue while executing the pre interpreter logic
+ */
+ @Override
+ public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+ throws ApexPythonInterpreterException
+ {
+ this.preInitConfigs = preInitConfigs;
+ }
+
+ /***
+ * Starts all of the worker threads. Also sleeps for a few moments to ensure "fat" frameworks like Tensorflow can
+ * be allowed to boot completely.
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public void startInterpreter() throws ApexPythonInterpreterException
+ {
+ initWorkers();
+ try {
+ if (sleepTimeAfterInterpreterStart > 0) {
+ LOG.debug("Sleeping to let the interpreter boot up in memory");
+ Thread.sleep(sleepTimeAfterInterpreterStart);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /***
+ * Used to execute all of the commands from the command history when an operator is instantiating a new instance of
+ * the engine. Used by the dynamic partitioner to let a newly provisioned operator to catch up to the state of all of
+ * the remaining operator instances
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public void postStartInterpreter() throws ApexPythonInterpreterException
+ {
+ for ( InterpreterWrapper wrapper : workers) {
+ for (PythonRequestResponse requestResponse : commandHistory) {
+ PythonInterpreterRequest requestPayload = requestResponse.getPythonInterpreterRequest();
+ try {
+ wrapper.processRequest(requestResponse,requestPayload);
+ } catch (InterruptedException e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ }
+ }
+ }
+
+ /***
+ * See {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+ * details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+ * set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+ * number of worker threads
+ * @param executionMode Whether these commands need to be run on all worker nodes or any of the worker node
+ * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+ * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+ * @param request Represents the request to be processed.
+ * @return A map containing the command as key and boolean representing success or failure as the value.
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode,long windowId,
+ long requestId, PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException
+ {
+ checkNotNullConditions(request);
+ checkNotNull(request.getGenericCommandsRequestPayload(), "Run commands payload not set");
+ checkNotNull(request.getGenericCommandsRequestPayload().getGenericCommands(),
+ "Commands that need to be run not set");
+ Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+ PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+ try {
+ if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+ LOG.debug("Executing run commands on all of the interpreter worker threads");
+ long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(),request.getTimeUnit()) /
+ numWorkerThreads;
+ LOG.debug("Allocating " + timeOutPerWorker + " nanoseconds for each of the worker threads");
+ if ( timeOutPerWorker == 0) {
+ timeOutPerWorker = 1;
+ }
+ request.setTimeout(timeOutPerWorker);
+ request.setTimeUnit(TimeUnit.NANOSECONDS);
+ for ( InterpreterWrapper wrapper : workers) {
+ lastSuccessfullySubmittedRequest = wrapper.runCommands(windowId,requestId,request);
+ if (lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ }
+ if ( returnStatus.size() > 0) {
+ commandHistory.add(lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ InterpreterWrapper currentThread = null;
+ if (executionMode.equals(WorkerExecutionMode.ANY)) {
+ LOG.debug("Executing run commands on a single interpreter worker thread");
+ currentThread = selectWorkerForCurrentCall(requestId);
+ }
+ if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+ currentThread = workers.get(request.hashCode() % numWorkerThreads);
+ LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+ }
+ if (currentThread != null) {
+ lastSuccessfullySubmittedRequest = currentThread.runCommands(windowId, requestId, request);
+ if (lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ throw new ApexPythonInterpreterException("No free interpreter threads available." +
+ " Consider increasing workers and relaunch");
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ return returnStatus;
+ }
+
+ /***
+ * See {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+ * details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+ * set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+ * number of worker threads
+ *
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker
+ * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+ * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+ * @param req Represents the request to be processed.
+ * @param <T>
+ * @return
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+ long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException
+ {
+ checkNotNullConditions(req);
+ checkNotNull(req.getMethodCallRequest(), "Method call info not set");
+ checkNotNull(req.getMethodCallRequest().getNameOfMethod(), "Method name not set");
+ Map<String,PythonRequestResponse<T>> returnStatus = new HashMap<>();
+ req.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+ PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+ try {
+ if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+ long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+ numWorkerThreads;
+ if ( timeOutPerWorker == 0) {
+ timeOutPerWorker = 1;
+ }
+ req.setTimeout(timeOutPerWorker);
+ req.setTimeUnit(TimeUnit.NANOSECONDS);
+ for ( InterpreterWrapper wrapper : workers) {
+ lastSuccessfullySubmittedRequest = wrapper.executeMethodCall(windowId,requestId,req);
+ if ( lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ }
+ if ( returnStatus.size() > 0) {
+ commandHistory.add(lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ InterpreterWrapper currentThread = null;
+ if (executionMode.equals(WorkerExecutionMode.ANY)) {
+ currentThread = selectWorkerForCurrentCall(requestId);
+ }
+ if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+ currentThread = workers.get(req.hashCode() % numWorkerThreads);
+ LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+ }
+ if (currentThread != null) {
+ lastSuccessfullySubmittedRequest = currentThread.executeMethodCall(windowId, requestId, req);
+ if (lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ } else {
+ throw new ApexPythonInterpreterException("No free interpreter threads available." +
+ " Consider increasing workers and relaunch");
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ return returnStatus;
+ }
+
+ /***
+ * See {@link ApexPythonEngine#executeScript(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+ * details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+ * set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+ * number of worker threads
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker
+ * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+ * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+ * @param req Represents the request to be processed.
+ * @return
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,
+ long requestId, PythonInterpreterRequest<Void> req)
+ throws ApexPythonInterpreterException
+ {
+ checkNotNullConditions(req);
+ checkNotNull(req.getScriptExecutionRequestPayload(), "Script execution info not set");
+ checkNotNull(req.getScriptExecutionRequestPayload().getScriptName(), "Script name not set");
+ Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+ PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+ try {
+ if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+ long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+ numWorkerThreads;
+ if ( timeOutPerWorker == 0) {
+ timeOutPerWorker = 1;
+ }
+ req.setTimeout(timeOutPerWorker);
+ req.setTimeUnit(TimeUnit.NANOSECONDS);
+ for ( InterpreterWrapper wrapper : workers) {
+ lastSuccessfullySubmittedRequest = wrapper.executeScript(windowId,requestId,req);
+ if (lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(wrapper.getInterpreterId(),lastSuccessfullySubmittedRequest);
+ }
+ }
+ if ( returnStatus.size() > 0) {
+ commandHistory.add(lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ InterpreterWrapper currentThread = null;
+ if (executionMode.equals(WorkerExecutionMode.ANY)) {
+ currentThread = selectWorkerForCurrentCall(requestId);
+ }
+ if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+ currentThread = workers.get(req.hashCode() % numWorkerThreads);
+ LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+ }
+ if (currentThread != null) {
+ lastSuccessfullySubmittedRequest = currentThread.executeScript(windowId, requestId, req);
+ if (lastSuccessfullySubmittedRequest != null) {
+ returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ throw new ApexPythonInterpreterException("No free interpreter threads available." +
+ " Consider increasing workers and relaunch");
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ return returnStatus;
+ }
+
+ private void checkNotNullConditions(PythonInterpreterRequest request)
+ {
+ checkNotNull(request, "Request object cannnot be null");
+ checkNotNull(request.getTimeout(), "Time out value not set");
+ checkNotNull(request.getTimeUnit(), "Time out unit not set");
+ }
+
+ /***
+ * See {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+ * details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+ * set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+ * number of worker threads
+ * @param executionMode If the method call needs to be invoked on all workers or any single worker
+ * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+ * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+ * @param request
+ * @param <T>
+ * @return
+ * @throws ApexPythonInterpreterException
+ */
+ @Override
+ public <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode,long windowId, long requestId,
+ PythonInterpreterRequest<T> request) throws ApexPythonInterpreterException
+ {
+ checkNotNullConditions(request);
+ checkNotNull(request.getEvalCommandRequestPayload(), "Eval command info not set");
+ checkNotNull(request.getEvalCommandRequestPayload().getEvalCommand(),"Eval command not set");
+ Map<String,PythonRequestResponse<T>> statusOfEval = new HashMap<>();
+ PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+ try {
+ if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+ long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(), request.getTimeUnit()) /
+ numWorkerThreads;
+ if ( timeOutPerWorker == 0) {
+ timeOutPerWorker = 1;
+ }
+ request.setTimeout(timeOutPerWorker);
+ request.setTimeUnit(TimeUnit.NANOSECONDS);
+ for ( InterpreterWrapper wrapper : workers) {
+ lastSuccessfullySubmittedRequest = wrapper.eval(windowId,requestId,request);
+ if (lastSuccessfullySubmittedRequest != null) {
+ statusOfEval.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ }
+ commandHistory.add(lastSuccessfullySubmittedRequest);
+ } else {
+ InterpreterWrapper currentThread = null;
+ if (executionMode.equals(WorkerExecutionMode.ANY)) {
+ currentThread = selectWorkerForCurrentCall(requestId);
+ }
+ if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+ currentThread = workers.get(request.hashCode() % numWorkerThreads);
+ LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+ }
+ if (currentThread != null) {
+ lastSuccessfullySubmittedRequest = currentThread.eval(windowId, requestId, request);
+ if (lastSuccessfullySubmittedRequest != null) {
+ statusOfEval.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+ }
+ } else {
+ throw new ApexPythonInterpreterException("No free interpreter threads available." +
+ " Consider increasing workers and relaunch");
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new ApexPythonInterpreterException(e);
+ }
+ return statusOfEval;
+ }
+
+ @Override
+ public void stopInterpreter() throws ApexPythonInterpreterException
+ {
+ for ( InterpreterWrapper wrapper : workers) {
+ wrapper.stopInterpreter();
+ }
+ }
+
+ public int getNumWorkerThreads()
+ {
+ return numWorkerThreads;
+ }
+
+ public void setNumWorkerThreads(int numWorkerThreads)
+ {
+ this.numWorkerThreads = numWorkerThreads;
+ }
+
+ public List<InterpreterWrapper> getWorkers()
+ {
+ return workers;
+ }
+
+ public void setWorkers(List<InterpreterWrapper> workers)
+ {
+ this.workers = workers;
+ }
+
+ @Override
+ public List<PythonRequestResponse> getCommandHistory()
+ {
+ return commandHistory;
+ }
+
+ @Override
+ public void setCommandHistory(List<PythonRequestResponse> commandHistory)
+ {
+ this.commandHistory = commandHistory;
+ }
+
+ public long getSleepTimeAfterInterpreterStart()
+ {
+ return sleepTimeAfterInterpreterStart;
+ }
+
+ public void setSleepTimeAfterInterpreterStart(long sleepTimeAfterInterpreterStart)
+ {
+ this.sleepTimeAfterInterpreterStart = sleepTimeAfterInterpreterStart;
+ }
+
+ @Override
+ public BlockingQueue<PythonRequestResponse> getDelayedResponseQueue()
+ {
+ return delayedResponseQueue;
+ }
+
+ @Override
+ public void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue)
+ {
+ this.delayedResponseQueue = delayedResponseQueue;
+ }
+
+ public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+ {
+ return cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+ {
+ this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public int getBufferCapacity()
+ {
+ return bufferCapacity;
+ }
+
+ public void setBufferCapacity(int bufferCapacity)
+ {
+ this.bufferCapacity = bufferCapacity;
+ }
+
+ @Override
+ public long getNumStarvedReturns()
+ {
+ return numStarvedReturns;
+ }
+
+ @Override
+ public void setNumStarvedReturns(long numStarvedReturns)
+ {
+ this.numStarvedReturns = numStarvedReturns;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
new file mode 100644
index 0000000..dd75956
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+public enum SpinPolicy
+{
+ SLEEP,
+ BUSY_SPIN
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
new file mode 100644
index 0000000..9b05d73
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.jep;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
new file mode 100644
index 0000000..ba84249
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
new file mode 100644
index 0000000..6312b29
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.util.KryoCloneUtils;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+
+/***
+ * Abstract partitioner that can be used in partitioning instances of the BasePythonExecution operator. This
+ * class does not do anything meaningful. See {@link ThreadStarvationBasedPartitioner} for details.
+ */
+public abstract class AbstractPythonExecutionPartitioner implements Partitioner<BasePythonExecutionOperator>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractPythonExecutionPartitioner.class);
+
+ @JsonIgnore
+ protected BasePythonExecutionOperator prototypePythonOperator;
+
+ public AbstractPythonExecutionPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+ {
+ this.prototypePythonOperator = prototypePythonOperator;
+ }
+
+ @Override
+ public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+ Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+ {
+ List<Partition<BasePythonExecutionOperator>> requiredPartitions = buildTargetPartitions(partitions, context);
+ return requiredPartitions;
+ }
+
+ protected abstract List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+ Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context);
+
+ @Override
+ public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+ {
+
+ }
+
+ public Partitioner.Partition<BasePythonExecutionOperator> clonePartition()
+ {
+ Partitioner.Partition<BasePythonExecutionOperator> clonedKuduInputOperator =
+ new DefaultPartition<>(KryoCloneUtils.cloneObject(prototypePythonOperator));
+ return clonedKuduInputOperator;
+ }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
new file mode 100644
index 0000000..3acb32c
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+public enum PythonExecutionPartitionerType
+{
+ THREAD_STARVATION_BASED;
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
new file mode 100644
index 0000000..f93ead9
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/***
+ * A partitioner that adds another instance of the python operator when the percent of non-serviced requests within any
+ * two checkpoint boundaries exceed a certain threshold. The threshold for triggering is configurable and is expressed
+ * as a percentage.
+ */
+public class ThreadStarvationBasedPartitioner extends AbstractPythonExecutionPartitioner
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadStarvationBasedPartitioner.class);
+
+ private float threadStarvationThresholdRatio;
+
+ public ThreadStarvationBasedPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+ {
+ super(prototypePythonOperator);
+ }
+
+ /***
+ * Calculates the partitions that are required based on the starvations encountered for each checkpoint state. The
+ * new instance is fed with the command history of the original operator ( if any ) so that the new instance can
+ * be in the same state of the original operator when it starts processing the new tuples.
+ * @param partitions The current set of partitions
+ * @param context The partitioning context
+ * @return The new set of partitioned instances keeping the old ones in tact and rebuilding only new ones if needed.
+ */
+ @Override
+ protected List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+ Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+ {
+ List<Partition<BasePythonExecutionOperator>> returnList = new ArrayList<>();
+ if (partitions != null) {
+ returnList.addAll(partitions);
+ for (Partition<BasePythonExecutionOperator> aCurrentPartition : partitions) {
+ BasePythonExecutionOperator anOperator = aCurrentPartition.getPartitionedInstance();
+ long starvedCount = anOperator.getNumStarvedReturns();
+ long requestsForCheckpointWindow = anOperator.getNumberOfRequestsProcessedPerCheckpoint();
+ if ( requestsForCheckpointWindow != 0) { // when the operator is starting for the first time
+ float starvationPercent = 100 - ( ((requestsForCheckpointWindow - starvedCount ) /
+ requestsForCheckpointWindow) * 100);
+ if (starvationPercent > anOperator.getStarvationPercentBeforeSpawningNewInstance()) {
+ LOG.info("Creating a new instance of the python operator as starvation % is " + starvationPercent);
+ Partition<BasePythonExecutionOperator> newInstance = clonePartition();
+ List<PythonRequestResponse> commandHistory = new ArrayList<>();
+ commandHistory.addAll(anOperator.getAccumulatedCommandHistory());
+ newInstance.getPartitionedInstance().setAccumulatedCommandHistory(commandHistory);
+ returnList.add(newInstance);
+ }
+ }
+ }
+ }
+ return returnList;
+ }
+
+ public float getThreadStarvationThresholdRatio()
+ {
+ return threadStarvationThresholdRatio;
+ }
+
+ public void setThreadStarvationThresholdRatio(float threadStarvationThresholdRatio)
+ {
+ this.threadStarvationThresholdRatio = threadStarvationThresholdRatio;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
new file mode 100644
index 0000000..94b3e37
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.partitioner;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
new file mode 100644
index 0000000..98aaff5
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class EvalCommandRequestPayload
+{
+ private boolean deleteVariableAfterEvalCall;
+
+ private String variableNameToExtractInEvalCall;
+
+ private String evalCommand;
+
+ private Map<String,Object> paramsForEvalCommand;
+
+ public boolean isDeleteVariableAfterEvalCall()
+ {
+ return deleteVariableAfterEvalCall;
+ }
+
+ public void setDeleteVariableAfterEvalCall(boolean deleteVariableAfterEvalCall)
+ {
+ this.deleteVariableAfterEvalCall = deleteVariableAfterEvalCall;
+ }
+
+ public String getVariableNameToExtractInEvalCall()
+ {
+ return variableNameToExtractInEvalCall;
+ }
+
+ public void setVariableNameToExtractInEvalCall(String variableNameToExtractInEvalCall)
+ {
+ this.variableNameToExtractInEvalCall = variableNameToExtractInEvalCall;
+ }
+
+ public String getEvalCommand()
+ {
+ return evalCommand;
+ }
+
+ public void setEvalCommand(String evalCommand)
+ {
+ this.evalCommand = evalCommand;
+ }
+
+ public Map<String, Object> getParamsForEvalCommand()
+ {
+ return paramsForEvalCommand;
+ }
+
+ public void setParamsForEvalCommand(Map<String, Object> paramsForEvalCommand)
+ {
+ this.paramsForEvalCommand = paramsForEvalCommand;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
new file mode 100644
index 0000000..b037248
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class GenericCommandsRequestPayload
+{
+ List<String> genericCommands;
+
+ public List<String> getGenericCommands()
+ {
+ return genericCommands;
+ }
+
+ public void setGenericCommands(List<String> genericCommands)
+ {
+ this.genericCommands = genericCommands;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
new file mode 100644
index 0000000..368cff9
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class MethodCallRequestPayload
+{
+ private String nameOfMethod;
+
+ private List<Object> args;
+
+ public String getNameOfMethod()
+ {
+ return nameOfMethod;
+ }
+
+ public void setNameOfMethod(String nameOfMethod)
+ {
+ this.nameOfMethod = nameOfMethod;
+ }
+
+ public List<Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(List<Object> args)
+ {
+ this.args = args;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
new file mode 100644
index 0000000..3721b84
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public enum PythonCommandType
+{
+ GENERIC_COMMANDS,
+ EVAL_COMMAND,
+ SCRIPT_COMMAND,
+ METHOD_INVOCATION_COMMAND
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
new file mode 100644
index 0000000..1b8a46f
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.concurrent.TimeUnit;
+
+public class PythonInterpreterRequest<T>
+{
+ PythonCommandType commandType;
+
+ long timeout;
+
+ TimeUnit timeUnit;
+
+ MethodCallRequestPayload methodCallRequest;
+
+ GenericCommandsRequestPayload genericCommandsRequestPayload;
+
+ EvalCommandRequestPayload evalCommandRequestPayload;
+
+ ScriptExecutionRequestPayload scriptExecutionRequestPayload;
+
+ Class<T> expectedReturnType;
+
+ // This constructor is not to be used by the user code and only exists for Kryo serialization
+ public PythonInterpreterRequest()
+ {
+ }
+
+ public PythonInterpreterRequest(Class<T> expectedReturnType)
+ {
+ this.expectedReturnType = expectedReturnType;
+ }
+
+ public PythonCommandType getCommandType()
+ {
+ return commandType;
+ }
+
+ public void setCommandType(PythonCommandType commandType)
+ {
+ this.commandType = commandType;
+ }
+
+ public MethodCallRequestPayload getMethodCallRequest()
+ {
+ return methodCallRequest;
+ }
+
+ public void setMethodCallRequest(MethodCallRequestPayload methodCallRequest)
+ {
+ this.methodCallRequest = methodCallRequest;
+ }
+
+ public GenericCommandsRequestPayload getGenericCommandsRequestPayload()
+ {
+ return genericCommandsRequestPayload;
+ }
+
+ public void setGenericCommandsRequestPayload(GenericCommandsRequestPayload genericCommandsRequestPayload)
+ {
+ this.genericCommandsRequestPayload = genericCommandsRequestPayload;
+ }
+
+ public EvalCommandRequestPayload getEvalCommandRequestPayload()
+ {
+ return evalCommandRequestPayload;
+ }
+
+ public void setEvalCommandRequestPayload(EvalCommandRequestPayload evalCommandRequestPayload)
+ {
+ this.evalCommandRequestPayload = evalCommandRequestPayload;
+ }
+
+ public ScriptExecutionRequestPayload getScriptExecutionRequestPayload()
+ {
+ return scriptExecutionRequestPayload;
+ }
+
+ public void setScriptExecutionRequestPayload(ScriptExecutionRequestPayload scriptExecutionRequestPayload)
+ {
+ this.scriptExecutionRequestPayload = scriptExecutionRequestPayload;
+ }
+
+ public Class<T> getExpectedReturnType()
+ {
+ return expectedReturnType;
+ }
+
+ public void setExpectedReturnType(Class<T> expectedReturnType)
+ {
+ this.expectedReturnType = expectedReturnType;
+ }
+
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ this.timeout = timeout;
+ }
+
+ public TimeUnit getTimeUnit()
+ {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit)
+ {
+ this.timeUnit = timeUnit;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
new file mode 100644
index 0000000..64dc1a5
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class PythonInterpreterResponse<T>
+{
+
+ Class<T> responseTypeClass;
+
+ Map<String,Boolean> commandStatus;
+
+ // To be used only by the Kryo serializer framework
+ public PythonInterpreterResponse()
+ {
+ }
+
+ public PythonInterpreterResponse(Class<T> responseTypeClassHandle)
+ {
+ responseTypeClass = responseTypeClassHandle;
+ }
+
+ T response;
+
+ public T getResponse()
+ {
+ return response;
+ }
+
+ public void setResponse(T response)
+ {
+ this.response = response;
+ }
+
+ public Map<String, Boolean> getCommandStatus()
+ {
+ return commandStatus;
+ }
+
+ public void setCommandStatus(Map<String, Boolean> commandStatus)
+ {
+ this.commandStatus = commandStatus;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
new file mode 100644
index 0000000..9e3958c
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+
+public class PythonRequestResponse<T>
+{
+ PythonInterpreterRequest pythonInterpreterRequest;
+
+ PythonInterpreterResponse pythonInterpreterResponse;
+
+ long requestId;
+
+ long windowId;
+
+ long requestStartTime;
+
+ long requestCompletionTime;
+
+ public PythonInterpreterRequest getPythonInterpreterRequest()
+ {
+ return pythonInterpreterRequest;
+ }
+
+ public void setPythonInterpreterRequest(PythonInterpreterRequest pythonInterpreterRequest)
+ {
+ this.pythonInterpreterRequest = pythonInterpreterRequest;
+ }
+
+ public PythonInterpreterResponse getPythonInterpreterResponse()
+ {
+ return pythonInterpreterResponse;
+ }
+
+ public void setPythonInterpreterResponse(PythonInterpreterResponse pythonInterpreterResponse)
+ {
+ this.pythonInterpreterResponse = pythonInterpreterResponse;
+ }
+
+ public long getRequestId()
+ {
+ return requestId;
+ }
+
+ public void setRequestId(long requestId)
+ {
+ this.requestId = requestId;
+ }
+
+ public long getWindowId()
+ {
+ return windowId;
+ }
+
+ public void setWindowId(long windowId)
+ {
+ this.windowId = windowId;
+ }
+
+ public long getRequestStartTime()
+ {
+ return requestStartTime;
+ }
+
+ public void setRequestStartTime(long requestStartTime)
+ {
+ this.requestStartTime = requestStartTime;
+ }
+
+ public long getRequestCompletionTime()
+ {
+ return requestCompletionTime;
+ }
+
+ public void setRequestCompletionTime(long requestCompletionTime)
+ {
+ this.requestCompletionTime = requestCompletionTime;
+ }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
new file mode 100644
index 0000000..403c73d
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public class ScriptExecutionRequestPayload
+{
+ String scriptName;
+
+ public String getScriptName()
+ {
+ return scriptName;
+ }
+
+ public void setScriptName(String scriptName)
+ {
+ this.scriptName = scriptName;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
new file mode 100644
index 0000000..dce69a6
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
new file mode 100644
index 0000000..839f493
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import jep.NDArray;
+
+/***
+ * A handy Kryo serializer class that can be used to serialize and deserialize a JEP NDArray instance. It is
+ * recommended that {@link NDimensionalArray} be used in lieu of this class. This is because NDArray is highly specific
+ * JEP data structure and will not give flexibility if the python engines are changed in the future.
+ */
+public class NDArrayKryoSerializer extends Serializer<NDArray>
+{
+ private static final short TRUE_AS_SHORTINT = 1;
+
+ private static final short FALSE_AS_SHORTINT = 0;
+
+ @Override
+ public void setGenerics(Kryo kryo, Class[] generics)
+ {
+ super.setGenerics(kryo, generics);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, NDArray ndArray)
+ {
+
+ Object dataVal = ndArray.getData();
+ if (dataVal == null) {
+ return;
+ }
+ // NDArray throws an exception in constructor if not an array. So below value will never be null
+ Class classNameForArrayType = dataVal.getClass().getComponentType(); // null if it is not an array
+ int[] dimensions = ndArray.getDimensions();
+ boolean signedFlag = ndArray.isUnsigned();
+ int arraySizeInSingleDimension = 1;
+ for (int aDimensionSize : dimensions) {
+ arraySizeInSingleDimension = arraySizeInSingleDimension * aDimensionSize;
+ }
+ // write the single dimension length
+ output.writeInt(arraySizeInSingleDimension);
+ // write the dimension of the dimensions int array
+ output.writeInt(dimensions.length);
+ // next we write the dimensions array itself
+ output.writeInts(dimensions);
+
+ // next write the unsigned flag
+ output.writeBoolean(signedFlag);
+
+ // write the data type of the N-dimensional Array
+ if (classNameForArrayType != null) {
+ output.writeString(classNameForArrayType.getCanonicalName());
+ } else {
+ output.writeString(null);
+ }
+
+ // write the array contents
+ if (dataVal != null) {
+ switch (classNameForArrayType.getCanonicalName()) {
+ case "float":
+ output.writeFloats((float[])dataVal);
+ break;
+ case "int":
+ output.writeInts((int[])dataVal);
+ break;
+ case "double":
+ output.writeDoubles((double[])dataVal);
+ break;
+ case "long":
+ output.writeLongs((long[])dataVal);
+ break;
+ case "short":
+ output.writeShorts((short[])dataVal);
+ break;
+ case "byte":
+ output.writeBytes((byte[])dataVal);
+ break;
+ case "boolean":
+ boolean[] originalBoolArray = (boolean[])dataVal;
+ short[] convertedBoolArray = new short[originalBoolArray.length];
+ for (int i = 0; i < originalBoolArray.length; i++) {
+ if (originalBoolArray[i]) {
+ convertedBoolArray[i] = TRUE_AS_SHORTINT;
+ } else {
+ convertedBoolArray[i] = FALSE_AS_SHORTINT;
+ }
+ }
+ output.writeShorts(convertedBoolArray);
+ break;
+ default:
+ throw new RuntimeException("Unsupported NDArray type serialization object");
+ }
+ }
+ }
+
+ @Override
+ public NDArray read(Kryo kryo, Input input, Class<NDArray> aClass)
+ {
+ int singleDimensionArrayLength = input.readInt();
+ int lengthOfDimensionsArray = input.readInt();
+ int[] dimensions = input.readInts(lengthOfDimensionsArray);
+ boolean signedFlag = input.readBoolean();
+
+ String dataType = input.readString();
+ if ( dataType == null) {
+ return null;
+ }
+ switch (dataType) {
+ case "float":
+ NDArray<float[]> floatNDArray = new NDArray<>(
+ input.readFloats(singleDimensionArrayLength),signedFlag,dimensions);
+ return floatNDArray;
+ case "int":
+ NDArray<int[]> intNDArray = new NDArray<>(
+ input.readInts(singleDimensionArrayLength),signedFlag,dimensions);
+ return intNDArray;
+ case "double":
+ NDArray<double[]> doubleNDArray = new NDArray<>(
+ input.readDoubles(singleDimensionArrayLength),signedFlag,dimensions);
+ return doubleNDArray;
+ case "long":
+ NDArray<long[]> longNDArray = new NDArray<>(
+ input.readLongs(singleDimensionArrayLength),signedFlag,dimensions);
+ return longNDArray;
+ case "short":
+ NDArray<short[]> shortNDArray = new NDArray<>(
+ input.readShorts(singleDimensionArrayLength),signedFlag,dimensions);
+ return shortNDArray;
+ case "byte":
+ NDArray<byte[]> byteNDArray = new NDArray<>(
+ input.readBytes(singleDimensionArrayLength),signedFlag,dimensions);
+ return byteNDArray;
+ case "boolean":
+ short[] shortsArray = input.readShorts(singleDimensionArrayLength);
+ boolean[] boolsArray = new boolean[shortsArray.length];
+ for (int i = 0; i < shortsArray.length; i++) {
+ if (TRUE_AS_SHORTINT == shortsArray[i]) {
+ boolsArray[i] = true;
+ } else {
+ boolsArray[i] = false;
+ }
+ }
+ NDArray<boolean[]> booleanNDArray = new NDArray<>(boolsArray,signedFlag,dimensions);
+ return booleanNDArray;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
new file mode 100644
index 0000000..c233855
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import jep.NDArray;
+
+/***
+ * Represents a wrapper around a numpy array. The way to build a numpy array is by first creating a single
+ * dimensional array and then specifying the dimensions. The dimensions specify how the single dimension will slice
+ * the single dimensional array.
+ * @param <T> The data type of the single dimensional array. For example for a numpy array of type float, T will be
+ * of type float[].
+ * <p>
+ * Only the following types are supported:
+ * <ol>
+ * <li>float[]</li>
+ * <li>int[]</li>
+ * <li>double[]</li>
+ * <li>long[]</li>
+ * <li>short[]</li>
+ * <li>byte[]</li>
+ * <li>boolean[]</li>
+ * </ol>
+ * </p>
+ * <p>No support for complex types. See example application in test modules for code snippets & usage</p>
+ */
+public class NDimensionalArray<T>
+{
+
+ int[] dimensions;
+
+ T data;
+
+ int lengthOfSequentialArray;
+
+ boolean signedFlag;
+
+ public NDimensionalArray()
+ {
+ }
+
+ public NDArray<T> toNDArray()
+ {
+ return new NDArray<T>(data,signedFlag,dimensions);
+ }
+
+ public int[] getDimensions()
+ {
+ return dimensions;
+ }
+
+ public void setDimensions(int[] dimensions)
+ {
+ this.dimensions = dimensions;
+ }
+
+ public int getLengthOfSequentialArray()
+ {
+ return lengthOfSequentialArray;
+ }
+
+ public void setLengthOfSequentialArray(int lengthOfSequentialArray)
+ {
+ this.lengthOfSequentialArray = lengthOfSequentialArray;
+ }
+
+ public boolean isSignedFlag()
+ {
+ return signedFlag;
+ }
+
+ public void setSignedFlag(boolean signedFlag)
+ {
+ this.signedFlag = signedFlag;
+ }
+
+ public T getData()
+ {
+ return data;
+ }
+
+ public void setData(T data)
+ {
+ this.data = data;
+ }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
new file mode 100644
index 0000000..c998c20
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+
+/*** A handy utility class that implements boiler plate code while building request objects. Only commonly
+ * used ones are implemented.
+ *
+ */
+public class PythonRequestResponseUtil
+{
+ /***
+ * Builds the request object for run commands API request. See
+ * {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+ * @param commands
+ * @param timeOut timeout for the request to complete
+ * @param timeUnit Time units
+ * @return A request object that can be passed to the Python Engine API for run commands
+ */
+ public static PythonInterpreterRequest<Void> buildRequestObjectForRunCommands(List<String> commands, long timeOut,
+ TimeUnit timeUnit)
+ {
+ GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+ genericCommandsRequestPayload.setGenericCommands(commands);
+ PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+ request.setTimeUnit(timeUnit);
+ request.setTimeout(timeOut);
+ request.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+ return request;
+ }
+
+ /***
+ * Builds the request object for the Eval command request. See
+ * {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+ * @param evalCommand The eval expression
+ * @param evalParams Variables that need to be substituted
+ * @param varNameToExtract The name of variable to extract if any after the expression is evaluated. Can be null
+ * @param deleteVarAfterExtract The name of the variable to delete if any. null allowed
+ * @param timeOut Timeout for the API to complete processing
+ * @param timeUnit Units of time for the time out variable
+ * @param clazz The Class that represents the return type
+ * @param <T> Template construct for Java type inference
+ * @return The request object that can be used for the Eval command
+ */
+ public static <T> PythonInterpreterRequest<T> buildRequestForEvalCommand(
+ String evalCommand, Map<String,Object> evalParams, String varNameToExtract,
+ boolean deleteVarAfterExtract, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+ {
+ PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+ EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+ evalCommandRequestPayload.setEvalCommand(evalCommand);
+ evalCommandRequestPayload.setVariableNameToExtractInEvalCall(varNameToExtract);
+ evalCommandRequestPayload.setParamsForEvalCommand(evalParams);
+ evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVarAfterExtract);
+ request.setTimeUnit(timeUnit);
+ request.setTimeout(timeOut);
+ request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+ return request;
+ }
+
+ /***
+ * Builds the request object for the Method call command. See
+ * {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+ * @param methodName Name of the method
+ * @param methodParams parames to the method
+ * @param timeOut Time allocated for completing the API
+ * @param timeUnit The units of time
+ * @param clazz The Class that represents the return type
+ * @param <T> Java templating signature
+ * @return The request object that can be used for method calls
+ */
+ public static <T> PythonInterpreterRequest<T> buildRequestForMethodCallCommand(
+ String methodName, List<Object> methodParams, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+ {
+ PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+ MethodCallRequestPayload methodCallRequestPayload = new MethodCallRequestPayload();
+ methodCallRequestPayload.setNameOfMethod(methodName);
+ methodCallRequestPayload.setArgs(methodParams);
+ request.setTimeUnit(timeUnit);
+ request.setTimeout(timeOut);
+ request.setMethodCallRequest(methodCallRequestPayload);
+ return request;
+ }
+
+ /***
+ * Builds a request object that can be used for executing the script call commands.
+ * @param scriptPath Full path to the file name containing the script
+ * @param timeOut The time that can be used to complete the execution of the script
+ * @param timeUnit Unit of time for time out parameter
+ * @param clazz The class that can be used to represent the return type
+ * @param <T> Java template for type inference
+ * @return The Request object that can be used for a script call invocation
+ */
+ public static <T> PythonInterpreterRequest<T> buildRequestForScriptCallCommand(
+ String scriptPath, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+ {
+ PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+ ScriptExecutionRequestPayload scriptExecutionRequestPayload = new ScriptExecutionRequestPayload();
+ scriptExecutionRequestPayload.setScriptName(scriptPath);
+ request.setTimeUnit(timeUnit);
+ request.setTimeout(timeOut);
+ request.setScriptExecutionRequestPayload(scriptExecutionRequestPayload);
+ return request;
+ }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
new file mode 100644
index 0000000..9a38d81
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.function.FunctionOperator;
+import org.apache.apex.malhar.lib.function.FunctionOperatorUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PythonOperatorExample")
+/**
+ * @since 3.8.0
+ */
+public class PythonExecutorApplication implements StreamingApplication
+{
+ @VisibleForTesting
+ Function.MapFunction<Object, ?> outputFn = FunctionOperatorUtil.CONSOLE_SINK_FN;
+
+ @VisibleForTesting
+ PythonPayloadPOJOGenerator pojoDataGenerator;
+
+ @VisibleForTesting
+ SimplePythonOpOperator simplePythonOpOperator;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ pojoDataGenerator = dag.addOperator("Input", new PythonPayloadPOJOGenerator());
+ simplePythonOpOperator = dag.addOperator("pythonprocessor", new SimplePythonOpOperator());
+ FunctionOperator.MapFunctionOperator<Object, ?> output = dag.addOperator("out",
+ new FunctionOperator.MapFunctionOperator<>(outputFn));
+ dag.addStream("InputToPython", pojoDataGenerator.output, simplePythonOpOperator.input);
+ dag.addStream("PythonToOutput", simplePythonOpOperator.outputPort, output.input);
+
+ }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
new file mode 100644
index 0000000..fe3d69b
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+import org.apache.apex.malhar.python.test.PythonAvailabilityTestRule;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+
+import static org.junit.Assert.assertEquals;
+
+public class PythonExecutorApplicationTest
+{
+ private static final List<Object> results = Collections.synchronizedList(new ArrayList<>());
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(PythonExecutorApplicationTest.class);
+
+ @Rule
+ public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+
+ @SuppressWarnings("serial")
+ private static final Function.MapFunction<Object, Void> outputFn = new Function.MapFunction<Object, Void>()
+ {
+ @Override
+ public Void f(Object input)
+ {
+ results.add(input);
+ return null;
+ }
+ };
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testApplication() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(Launcher.LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
+ PythonExecutorApplication pythonExecutorApplication = new PythonExecutorApplication();
+ pythonExecutorApplication.outputFn = outputFn;
+ Launcher.AppHandle appHandle = launcher.launchApp(pythonExecutorApplication, conf, launchAttributes);
+ int sleepTimeCounterForLoopExit = 0;
+ int sleepTimePerIteration = 1000;
+ // wait until expected result count or timeout
+ while (results.size() < pythonExecutorApplication.pojoDataGenerator.getMaxTuples()) {
+ sleepTimeCounterForLoopExit += sleepTimePerIteration;
+ if (sleepTimeCounterForLoopExit > 30000) {
+ break;
+ }
+ LOG.info("Test sleeping until the application time out is reached");
+ Thread.sleep(sleepTimePerIteration);
+ }
+ appHandle.shutdown(Launcher.ShutdownMode.KILL);
+ assertEquals(pythonExecutorApplication.pojoDataGenerator.getMaxTuples(), results.size());
+ }
+
+}
+
+
+
+
+
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
new file mode 100644
index 0000000..6996d94
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits a simple float and int for python operator to consume
+ *
+ * @since 3.8.0
+ */
+public class PythonPayloadPOJOGenerator implements InputOperator
+{
+ private long tuplesCounter = 0;
+ private long currentWindowTuplesCounter = 0;
+
+ // Limit number of emitted tuples per window
+ @Min(1)
+ private long maxTuplesPerWindow = 150;
+
+ @Min(1)
+ private long maxTuples = 300;
+
+ private final Random random = new Random();
+
+ private static final int MAX_RANDOM_INT = 100;
+
+ public static final int DIMENSION_SIZE = 2;
+
+ public static int[] intDimensionSums = new int[ DIMENSION_SIZE * DIMENSION_SIZE ];
+
+ public static float[] floatDimensionSums = new float[DIMENSION_SIZE * DIMENSION_SIZE];
+
+ public final transient DefaultOutputPort<PythonProcessingPojo> output = new DefaultOutputPort<>();
+
+ public PythonPayloadPOJOGenerator()
+ {
+ for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+ intDimensionSums[i] = 0;
+ floatDimensionSums[i] = 0;
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowTuplesCounter = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while ( ( currentWindowTuplesCounter < maxTuplesPerWindow) && (tuplesCounter < maxTuples) ) {
+ PythonProcessingPojo pythonProcessingPojo = new PythonProcessingPojo();
+ pythonProcessingPojo.setX(random.nextInt(MAX_RANDOM_INT));
+ pythonProcessingPojo.setY(random.nextFloat());
+
+ float[] f = new float[( DIMENSION_SIZE * DIMENSION_SIZE)];
+ for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+ f[i] = random.nextFloat();
+ floatDimensionSums[ i % DIMENSION_SIZE ] = floatDimensionSums[ i % DIMENSION_SIZE ] + f[i];
+ }
+ NDimensionalArray<float[]> nDimensionalFloatArray = new NDimensionalArray<>();
+ nDimensionalFloatArray.setData(f);
+ nDimensionalFloatArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+ nDimensionalFloatArray.setLengthOfSequentialArray(floatDimensionSums.length);
+ nDimensionalFloatArray.setSignedFlag(false);
+ pythonProcessingPojo.setNumpyFloatArray(nDimensionalFloatArray);
+
+ int[] ints = new int[( DIMENSION_SIZE * DIMENSION_SIZE)];
+ for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+ ints[i] = random.nextInt(MAX_RANDOM_INT);
+ intDimensionSums[ i % DIMENSION_SIZE ] = intDimensionSums [ i % DIMENSION_SIZE ] + ints[i];
+ }
+ NDimensionalArray<int[]> nDimensionalIntArray = new NDimensionalArray<>();
+ nDimensionalIntArray.setData(ints);
+ nDimensionalIntArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+ nDimensionalIntArray.setLengthOfSequentialArray(ints.length);
+ nDimensionalIntArray.setSignedFlag(false);
+ pythonProcessingPojo.setNumpyIntArray(nDimensionalIntArray);
+ output.emit(pythonProcessingPojo);
+ currentWindowTuplesCounter += 1;
+ tuplesCounter += 1;
+ }
+ }
+
+ public long getMaxTuples()
+ {
+ return maxTuples;
+ }
+
+ public void setMaxTuples(long maxTuples)
+ {
+ this.maxTuples = maxTuples;
+ }
+
+ public long getMaxTuplesPerWindow()
+ {
+ return maxTuplesPerWindow;
+ }
+
+ public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+ {
+ this.maxTuplesPerWindow = maxTuplesPerWindow;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ public static int[] getIntDimensionSums()
+ {
+ return intDimensionSums;
+ }
+
+ public static void setIntDimensionSums(int[] intDimensionSums)
+ {
+ PythonPayloadPOJOGenerator.intDimensionSums = intDimensionSums;
+ }
+
+ public static float[] getFloatDimensionSums()
+ {
+ return floatDimensionSums;
+ }
+
+ public static void setFloatDimensionSums(float[] floatDimensionSums)
+ {
+ PythonPayloadPOJOGenerator.floatDimensionSums = floatDimensionSums;
+ }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
new file mode 100644
index 0000000..33c3452
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+public class PythonProcessingPojo
+{
+
+ private float y;
+
+ private int x;
+
+
+ private NDimensionalArray<float[]> numpyFloatArray;
+
+ private NDimensionalArray<int[]> numpyIntArray;
+
+
+ public float getY()
+ {
+ return y;
+ }
+
+ public void setY(float y)
+ {
+ this.y = y;
+ }
+
+ public int getX()
+ {
+ return x;
+ }
+
+ public void setX(int x)
+ {
+ this.x = x;
+ }
+
+ public NDimensionalArray<float[]> getNumpyFloatArray()
+ {
+ return numpyFloatArray;
+ }
+
+ public void setNumpyFloatArray(NDimensionalArray<float[]> numpyFloatArray)
+ {
+ this.numpyFloatArray = numpyFloatArray;
+ }
+
+ public NDimensionalArray<int[]> getNumpyIntArray()
+ {
+ return numpyIntArray;
+ }
+
+ public void setNumpyIntArray(NDimensionalArray<int[]> numpyIntArray)
+ {
+ this.numpyIntArray = numpyIntArray;
+ }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
new file mode 100644
index 0000000..69ece35
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+import org.apache.apex.malhar.python.base.util.PythonRequestResponseUtil;
+
+public class SimplePythonOpOperator extends BasePythonExecutionOperator<PythonProcessingPojo>
+{
+ private Map<String,PythonRequestResponse<NDimensionalArray>> lastKnownResponse;
+
+ @Override
+ public Map<PythonInterpreterConfig, Object> getPreInitConfigurations()
+ {
+ Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ Set<String> sharedLibsList = new HashSet<>();
+ sharedLibsList.add("numpy");
+ preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+ // Next two configs allow for a very low latency mode wherein the cost of CPU is sacrificed for low latencies
+ // Defaults are saner and the following config is overriding
+ preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, "" + SpinPolicy.BUSY_SPIN.name());
+ preInitConfigs.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY,
+ com.conversantmedia.util.concurrent.SpinPolicy.SPINNING);
+ return preInitConfigs;
+ }
+
+ @Override
+ public PythonRequestResponse processPythonCodeForIncomingTuple(PythonProcessingPojo input, ApexPythonEngine pythonEngineRef)
+ throws ApexPythonInterpreterException
+ {
+ Map<String,Object> evalParams = new HashMap<>();
+ evalParams.put("intArrayToAdd",input.getNumpyIntArray());
+ evalParams.put("floatArrayToAdd",input.getNumpyFloatArray());
+ // Not assigning to any var as this results in output printed on the console which can be a validation of redirect
+ String evalCommand = "npval=np.add(intMatrix,intArrayToAdd)";
+ //String evalCommand = "print(type(intArrayToAdd))";
+ PythonInterpreterRequest<NDimensionalArray> request = PythonRequestResponseUtil.buildRequestForEvalCommand(
+ evalCommand,evalParams,"intMatrix",false, 20,
+ TimeUnit.MILLISECONDS, NDimensionalArray.class);
+ lastKnownResponse = pythonEngineRef.eval(
+ WorkerExecutionMode.ANY,currentWindowId, requestCounterForThisWindow,request);
+ for ( String evalCommandSubmitted: lastKnownResponse.keySet()) {
+ return lastKnownResponse.get(evalCommandSubmitted); // we just need one of the N workers response.
+ }
+ return null;
+ }
+
+ @Override
+ public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngineRef) throws ApexPythonInterpreterException
+ {
+ List<String> commandsToRun = new ArrayList<>();
+ commandsToRun.add("import sys");
+ commandsToRun.add("import numpy as np");
+ commandsToRun.add("intMatrix = np.ones((2,2),dtype=int)");
+ commandsToRun.add("floatMatrix = np.ones((2,2),dtype=float)");
+ pythonEngineRef.runCommands(WorkerExecutionMode.BROADCAST,0L,0L,
+ PythonRequestResponseUtil.buildRequestObjectForRunCommands(commandsToRun,1, TimeUnit.SECONDS));
+ }
+
+ public Map<String, PythonRequestResponse<NDimensionalArray>> getLastKnownResponse()
+ {
+ return lastKnownResponse;
+ }
+
+ public void setLastKnownResponse(Map<String, PythonRequestResponse<NDimensionalArray>> lastKnownResponse)
+ {
+ this.lastKnownResponse = lastKnownResponse;
+ }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
new file mode 100644
index 0000000..7a1c1c1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.test.BasePythonTest;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+public class BaseJEPTest extends BasePythonTest
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(BaseJEPTest.class);
+
+ public static boolean JEP_INITIALIZED = false;
+
+ private static Object lockToInitializeJEP = new Object();
+
+ static InterpreterThread pythonEngineThread;
+
+ static InterpreterWrapper interpreterWrapper;
+
+ static JepPythonEngine jepPythonEngine;
+
+ static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ static BlockingQueue<PythonRequestResponse> requestQueue =
+ new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+ static BlockingQueue<PythonRequestResponse> responseQueue =
+ new DisruptorBlockingQueue<PythonRequestResponse>(8,SpinPolicy.WAITING);
+
+ static BlockingQueue<PythonRequestResponse> delayedResponseQueueForWrapper =
+ new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+
+ public static void initJEPThread() throws Exception
+ {
+ if (!JEP_INITIALIZED) {
+ synchronized (lockToInitializeJEP) {
+ if (!JEP_INITIALIZED) {
+ // Interpreter for thread based tests
+ pythonEngineThread = new InterpreterThread(requestQueue,responseQueue,"unittests-1");
+ pythonEngineThread.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+ executorService.submit(pythonEngineThread);
+
+ // interpreter wrapper for wrapper based tests
+ interpreterWrapper = new InterpreterWrapper("unit-test-wrapper",delayedResponseQueueForWrapper,
+ SpinPolicy.SPINNING);
+ interpreterWrapper.startInterpreter();
+
+ // JEP python engine tests
+ jepPythonEngine = new JepPythonEngine("unit-tests-jeppythonengine",5);
+ jepPythonEngine.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+ jepPythonEngine.startInterpreter();
+ JEP_INITIALIZED = true;
+ }
+ }
+ }
+ }
+
+ private void setCommonConstructsForRequestResponseObject(PythonCommandType commandType,
+ PythonInterpreterRequest request, PythonRequestResponse requestResponse )
+ throws ApexPythonInterpreterException
+ {
+ request.setCommandType(commandType);
+ requestResponse.setRequestStartTime(System.currentTimeMillis());
+ requestResponse.setRequestId(1L);
+ requestResponse.setWindowId(1L);
+ switch (commandType) {
+ case EVAL_COMMAND:
+ EvalCommandRequestPayload payload = new EvalCommandRequestPayload();
+ request.setEvalCommandRequestPayload(payload);
+ break;
+ case METHOD_INVOCATION_COMMAND:
+ MethodCallRequestPayload methodCallRequest = new MethodCallRequestPayload();
+ request.setMethodCallRequest(methodCallRequest);
+ break;
+ case SCRIPT_COMMAND:
+ ScriptExecutionRequestPayload scriptPayload = new ScriptExecutionRequestPayload();
+ request.setScriptExecutionRequestPayload(scriptPayload);
+ break;
+ case GENERIC_COMMANDS:
+ GenericCommandsRequestPayload payloadForGenericCommands = new GenericCommandsRequestPayload();
+ request.setGenericCommandsRequestPayload(payloadForGenericCommands);
+ break;
+ default:
+ throw new ApexPythonInterpreterException("Unsupported command type");
+ }
+
+ }
+
+ public PythonRequestResponse<Void> buildRequestResponseObjectForVoidPayload(PythonCommandType commandType)
+ throws Exception
+ {
+ PythonRequestResponse<Void> requestResponse = new PythonRequestResponse();
+ PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+ PythonInterpreterResponse<Void> response = new PythonInterpreterResponse<>(Void.class);
+ requestResponse.setPythonInterpreterRequest(request);
+ requestResponse.setPythonInterpreterResponse(response);
+ setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+ return requestResponse;
+ }
+
+ public PythonRequestResponse<Long> buildRequestResponseObjectForLongPayload(
+ PythonCommandType commandType) throws Exception
+ {
+ PythonRequestResponse<Long> requestResponse = new PythonRequestResponse();
+ PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+ requestResponse.setPythonInterpreterRequest(request);
+ PythonInterpreterResponse<Long> response = new PythonInterpreterResponse<>(Long.class);
+ requestResponse.setPythonInterpreterRequest(request);
+ requestResponse.setPythonInterpreterResponse(response);
+ setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+ return requestResponse;
+ }
+
+
+
+ public PythonRequestResponse<Integer> buildRequestResponseObjectForIntPayload(
+ PythonCommandType commandType) throws Exception
+ {
+ PythonRequestResponse<Integer> requestResponse = new PythonRequestResponse();
+ PythonInterpreterRequest<Integer> request = new PythonInterpreterRequest<>(Integer.class);
+ requestResponse.setPythonInterpreterRequest(request);
+ PythonInterpreterResponse<Integer> response = new PythonInterpreterResponse<>(Integer.class);
+ requestResponse.setPythonInterpreterRequest(request);
+ requestResponse.setPythonInterpreterResponse(response);
+ setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+ return requestResponse;
+ }
+
+
+ protected PythonRequestResponse<Void> runCommands(List<String> commands) throws Exception
+ {
+ PythonRequestResponse<Void> runCommandsRequest = buildRequestResponseObjectForVoidPayload(
+ PythonCommandType.GENERIC_COMMANDS);
+ runCommandsRequest.getPythonInterpreterRequest().getGenericCommandsRequestPayload().setGenericCommands(commands);
+ pythonEngineThread.getRequestQueue().put(runCommandsRequest);
+ Thread.sleep(1000); // wait for command to be processed
+ return pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+ }
+
+
+ protected PythonInterpreterRequest<Long> buildRequestObjectForLongEvalCommand(String command, String returnVar,
+ Map<String,Object> paramsForEval, long timeOut, TimeUnit timeUnit, boolean deleteVariable)
+ {
+ PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+ request.setTimeout(timeOut);
+ request.setTimeUnit(timeUnit);
+ EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+ request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+ evalCommandRequestPayload.setParamsForEvalCommand(paramsForEval);
+ evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVariable);
+ evalCommandRequestPayload.setVariableNameToExtractInEvalCall(returnVar);
+ evalCommandRequestPayload.setEvalCommand(command);
+ request.setExpectedReturnType(Long.class);
+ return request;
+ }
+
+ protected PythonInterpreterRequest<Void> buildRequestObjectForVoidGenericCommand(List<String> commands, long timeOut,
+ TimeUnit timeUnit)
+ {
+ PythonInterpreterRequest<Void> genericCommandRequest = new PythonInterpreterRequest<>(Void.class);
+ genericCommandRequest.setTimeout(timeOut);
+ genericCommandRequest.setTimeUnit(timeUnit);
+ GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+ genericCommandsRequestPayload.setGenericCommands(commands);
+ genericCommandRequest.setExpectedReturnType(Void.class);
+ genericCommandRequest.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+ return genericCommandRequest;
+ }
+
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
new file mode 100644
index 0000000..9e37605
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class InterpreterThreadTest extends BaseJEPTest
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterThreadTest.class);
+
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testRunCommands() throws Exception
+ {
+ long currentTime = System.currentTimeMillis();
+ File tempFile = File.createTempFile("apexpythonunittestruncommands-", ".txt");
+ tempFile.deleteOnExit();
+ String filePath = tempFile.getAbsolutePath();
+ assertEquals(0L,tempFile.length());
+
+ List<String> commands = new ArrayList();
+ commands.add("fileHandle = open('" + filePath + "', 'w')");
+ commands.add("fileHandle.write('" + currentTime + "')");
+ commands.add("fileHandle.flush()");
+ commands.add("fileHandle.close()");
+ runCommands(commands);
+ assertEquals(("" + currentTime).length(), tempFile.length());
+
+ List<String> errorCommands = new ArrayList();
+ errorCommands.add("1+2");
+ errorCommands.add("3+");
+ PythonRequestResponse<Void> response = runCommands(errorCommands);
+ Map<String,Boolean> responseStatus = response.getPythonInterpreterResponse().getCommandStatus();
+ assertTrue(responseStatus.get(errorCommands.get(0)));
+ assertFalse(responseStatus.get(errorCommands.get(1)));
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testMethodCall() throws Exception
+ {
+ String methodName = "jepMultiply";
+ List<String> commands = new ArrayList();
+ commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+ "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+ runCommands(commands);
+
+ List<Object> params = new ArrayList<>();
+ params.add(5L);
+ params.add(25L);
+
+ PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+ PythonCommandType.METHOD_INVOCATION_COMMAND);
+ MethodCallRequestPayload requestPayload = methodCallRequest.getPythonInterpreterRequest().getMethodCallRequest();
+ requestPayload.setNameOfMethod(methodName);
+ requestPayload.setArgs(params);
+ methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+ pythonEngineThread.getRequestQueue().put(methodCallRequest);
+ Thread.sleep(1000); // wait for command to be processed
+ PythonRequestResponse<Long> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+ TimeUnit.SECONDS);
+ assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),125L);
+ Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+ assertTrue(commandStatus.get(methodName));
+
+ params.remove(1);
+ methodCallRequest = buildRequestResponseObjectForLongPayload(PythonCommandType.METHOD_INVOCATION_COMMAND);
+ requestPayload = methodCallRequest
+ .getPythonInterpreterRequest().getMethodCallRequest();
+ requestPayload.setNameOfMethod(methodName);
+ requestPayload.setArgs(params);
+ methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+ pythonEngineThread.getRequestQueue().put(methodCallRequest);
+ Thread.sleep(1000); // wait for command to be processed
+ methodCallResponse = pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+ commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+ assertFalse(commandStatus.get(methodName));
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testScriptCall() throws Exception
+ {
+ File tempFileForScript = File.createTempFile("apexpythonunittestscript-", ".py");
+ tempFileForScript.deleteOnExit();
+ String filePathForFactorialScript = tempFileForScript.getAbsolutePath();
+ migrateFileFromResourcesFolderToTemp("factorial.py",filePathForFactorialScript);
+ PythonRequestResponse<Void> methodCallRequest = buildRequestResponseObjectForVoidPayload(
+ PythonCommandType.SCRIPT_COMMAND);
+ methodCallRequest.getPythonInterpreterRequest().getScriptExecutionRequestPayload().setScriptName(
+ filePathForFactorialScript);
+ pythonEngineThread.getRequestQueue().put(methodCallRequest);
+ Thread.sleep(1000); // wait for command to be processed
+ PythonRequestResponse<Void> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+ TimeUnit.SECONDS);
+ Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+ assertTrue(commandStatus.get(filePathForFactorialScript));
+ try (BufferedReader br = new BufferedReader(new FileReader("target/factorial-result.txt"))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ assertEquals(120,Integer.parseInt(line)); // asset factorial is calculated as written in script in resources
+ break; // There is only one line in the file per the python script
+ }
+ }
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testEvalCall() throws Exception
+ {
+ String expression = new String("x = a + b");
+ Random random = new Random();
+ int a = random.nextInt(100);
+ int b = random.nextInt(100);
+ Map<String,Object> argsForEval = new HashMap<>();
+ argsForEval.put("a",a);
+ argsForEval.put("b",b);
+ PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+ PythonCommandType.EVAL_COMMAND);
+ EvalCommandRequestPayload payload = methodCallRequest.getPythonInterpreterRequest().getEvalCommandRequestPayload();
+ payload.setEvalCommand(expression);
+ payload.setParamsForEvalCommand(argsForEval);
+ payload.setDeleteVariableAfterEvalCall(true);
+ payload.setVariableNameToExtractInEvalCall("x");
+ methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+ pythonEngineThread.getRequestQueue().put(methodCallRequest);
+ Thread.sleep(1000); // wait for command to be processed
+ PythonRequestResponse<Integer> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+ TimeUnit.SECONDS);
+ Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+ assertTrue(commandStatus.get(expression));
+ assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),(long)(a + b));
+ }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
new file mode 100644
index 0000000..6c2877d
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class InterpreterWrapperTest extends BaseJEPTest
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterWrapperTest.class);
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testTimeOuts() throws Exception
+ {
+ List<String> sequenceOfCommands = new ArrayList();
+ sequenceOfCommands.add("import time");
+ sequenceOfCommands.add("time.sleep(1)");
+
+ PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+ sequenceOfCommands,3,TimeUnit.SECONDS);
+ PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L, requestOne);
+ assertNotNull(resultOne);
+
+ requestOne.setTimeUnit(TimeUnit.MILLISECONDS);
+ requestOne.setTimeout(5);
+ PythonRequestResponse<Void> resultTWo = interpreterWrapper.runCommands(1L,1L,requestOne);
+ assertNull(resultTWo);
+
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testDelayedResponseQueue() throws Exception
+ {
+ List<String> sequenceOfCommands = new ArrayList();
+ sequenceOfCommands.add("import time");
+ sequenceOfCommands.add("x=4;time.sleep(1)");
+
+ PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+ sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+ PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L,requestOne);
+
+ HashMap<String,Object> evalParams = new HashMap<>();
+ evalParams.put("y", 2);
+
+ PythonInterpreterRequest<Long> requestTwo = buildRequestObjectForLongEvalCommand(
+ "x = x * y;time.sleep(1)","x",evalParams,10,TimeUnit.MILLISECONDS,false);
+ PythonRequestResponse<Long> result = interpreterWrapper.eval(1L,1L,requestTwo);
+
+ Thread.sleep(3000);
+
+ // only the next command will result in the queue getting drained of previous requests hence below
+ sequenceOfCommands = new ArrayList();
+ sequenceOfCommands.add("x=5");
+
+ PythonInterpreterRequest<Void> requestThree = buildRequestObjectForVoidGenericCommand(
+ sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+ PythonRequestResponse<Void> resultThree = interpreterWrapper.runCommands(1L,1L,requestThree);
+
+ assertFalse(delayedResponseQueueForWrapper.isEmpty());
+ assertEquals(2, delayedResponseQueueForWrapper.drainTo(new ArrayList<>()));
+
+ }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
new file mode 100644
index 0000000..98a1ee1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JepPythonEngineTest extends BaseJEPTest
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(JepPythonEngineTest.class);
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testSelectWorkerForCurrentCall() throws Exception
+ {
+ Set<String> overloadedWorkers = new HashSet<>();
+ List<String> busyCommands = new ArrayList<>();
+ busyCommands.add("import time");
+ busyCommands.add("time.sleep(5)");
+ PythonInterpreterRequest<Void> busyCommandRequest = buildRequestObjectForVoidGenericCommand(busyCommands,
+ 2,TimeUnit.MILLISECONDS);
+
+ for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads() - 1; i++) {
+ InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+ aWrapper.runCommands(1L,i,busyCommandRequest);
+ assertTrue(aWrapper.isCurrentlyBusy());
+ overloadedWorkers.add(aWrapper.getInterpreterId());
+ }
+ InterpreterWrapper candidateWrapperForExecution = null;
+ InterpreterWrapper validCandidateWrapperForExecution = null;
+ int counterForNullWorkers = 0;
+ int counterForValidWorkers = 0;
+ for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+ candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+ if ( candidateWrapperForExecution == null) {
+ counterForNullWorkers += 1;
+ } else {
+ counterForValidWorkers += 1;
+ validCandidateWrapperForExecution = candidateWrapperForExecution;
+ }
+ }
+ // numWorker threads because the select worker calls iterates over all workers to
+ // get any of the free workers. We did not give any worker for the 5th worker and hence should pass for all calls
+ assertEquals(jepPythonEngine.getNumWorkerThreads(), counterForValidWorkers);
+ assertEquals( 0, counterForNullWorkers); // None of the attempts should fail to get a worker
+ // Also we can only get that worker which has not been assigned a sleep instruction
+ assertFalse(overloadedWorkers.contains(validCandidateWrapperForExecution.getInterpreterId()));
+ Thread.sleep(5000); // all the python workers must be free after this line
+ // we now test for all workers being busy
+ for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+ InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+ aWrapper.runCommands(1L,i,busyCommandRequest);
+ }
+ for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+ candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+ assertNull(candidateWrapperForExecution);
+ }
+ Thread.sleep(5000); // ensures other tests in this class can use this engine after this test
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testWorkerExecutionMode() throws Exception
+ {
+ String methodName = "multiply";
+ List<String> commands = new ArrayList();
+ commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+ "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+
+ PythonInterpreterRequest<Void> requestForDef = buildRequestObjectForVoidGenericCommand(commands,1000,
+ TimeUnit.MILLISECONDS);
+
+ for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+ InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+ aWrapper.runCommands(1L,1L,requestForDef);
+ }
+
+ HashMap<String,Object> params = new HashMap<>();
+ params.put("y",3);
+ Map<String,PythonRequestResponse<Long>> response = jepPythonEngine.eval(WorkerExecutionMode.BROADCAST,
+ 1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+ "x", params,1000,TimeUnit.MILLISECONDS,false));
+ for (String aWorkerId: response.keySet()) {
+ assertEquals(12L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+ }
+ assertEquals(jepPythonEngine.getNumWorkerThreads(),response.size()); // ensure all workers responded
+
+ params = new HashMap<>();
+ params.put("y",6);
+ response = jepPythonEngine.eval(WorkerExecutionMode.ANY,
+ 1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+ "x", params,1000,TimeUnit.MILLISECONDS,false));
+ for (String aWorkerId: response.keySet()) {
+ assertEquals(24L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+ }
+ assertEquals(1,response.size()); // ensure all workers responded
+ Thread.sleep(5000); // ensure subsequent tests are not impacted by busy flags from current test
+ }
+
+ @JepPythonTestContext(jepPythonBasedTest = true)
+ @Test
+ public void testPostStartInterpreterLogic() throws Exception
+ {
+ JepPythonEngine pythonEngineForPostInit = new JepPythonEngine("unit-tests-jeppythonengine-preint",
+ 5);
+ List<String> commandsForPreInit = new ArrayList<>();
+ commandsForPreInit.add("x=5");
+ PythonRequestResponse<Void> aHistoryCommand = buildRequestResponseObjectForVoidPayload(
+ PythonCommandType.GENERIC_COMMANDS);
+ aHistoryCommand.getPythonInterpreterRequest()
+ .getGenericCommandsRequestPayload().setGenericCommands(commandsForPreInit);
+ aHistoryCommand.getPythonInterpreterRequest().setTimeout(10);
+ aHistoryCommand.getPythonInterpreterRequest().setTimeUnit(TimeUnit.MILLISECONDS);
+ List<PythonRequestResponse> historyOfCommands = new ArrayList<>();
+ historyOfCommands.add(aHistoryCommand);
+ pythonEngineForPostInit.setCommandHistory(historyOfCommands);
+
+ pythonEngineForPostInit.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+ pythonEngineForPostInit.startInterpreter();
+
+ pythonEngineForPostInit.postStartInterpreter();
+
+ HashMap<String,Object> params = new HashMap<>();
+ params.put("y",4);
+ Map<String,PythonRequestResponse<Long>> resultOfExecution = pythonEngineForPostInit.eval(
+ WorkerExecutionMode.BROADCAST, 1L,1L, buildRequestObjectForLongEvalCommand(
+ "x=x+y","x",params,100,TimeUnit.MILLISECONDS,false));
+ assertEquals(pythonEngineForPostInit.getNumWorkerThreads(),resultOfExecution.size());
+ for (String aWorkerId : resultOfExecution.keySet()) {
+ assertEquals(9L, resultOfExecution.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+ }
+ }
+}
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
new file mode 100644
index 0000000..0807da1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.io.File;
+
+import org.junit.Rule;
+
+import org.apache.commons.io.FileUtils;
+
+public class BasePythonTest
+{
+ @Rule
+ public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+ protected void migrateFileFromResourcesFolderToTemp(String resourceFileName,String targetFilePath) throws Exception
+ {
+ ClassLoader classLoader = getClass().getClassLoader();
+ File outFile = new File(targetFilePath);
+ FileUtils.copyInputStreamToFile(classLoader.getResourceAsStream(resourceFileName), outFile);
+ }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
new file mode 100644
index 0000000..efa3b7c
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation that helps in selectively triggering certain tests if JEP python is available at the time of
+ * launching of the tests
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface JepPythonTestContext
+{
+
+ boolean jepPythonBasedTest() default false;
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
new file mode 100644
index 0000000..118e2fe
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.BaseJEPTest;
+
+/**
+ * A Junit rule that helps in bypassing tests that cannot be done if the Python installation is not present.
+ * The unit tests will be triggered as soon as the switch -DjepInstalled=true is passed from the command line.
+ */
+public class PythonAvailabilityTestRule implements TestRule
+{
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(PythonAvailabilityTestRule.class);
+
+ public static final String JEP_LIBRARY_INSTALLED_SWITCH = "jepInstallPath";
+
+ @Override
+ public Statement apply(Statement base, Description description)
+ {
+ JepPythonTestContext testContext = description.getAnnotation(JepPythonTestContext.class);
+ String jepInstalledStrVal = System.getProperty(JEP_LIBRARY_INSTALLED_SWITCH);
+ boolean jepInstalled = false;
+ if (jepInstalledStrVal != null) {
+ jepInstalled = true;
+ LOG.debug("Using " + jepInstalledStrVal + " as the library path for python interpreter");
+ }
+ boolean runThisTest = true; // default is to run the test if no annotation is specified i.e. python is not required
+ if ( testContext != null) {
+ if ( (testContext.jepPythonBasedTest()) && (jepInstalled) ) {
+ runThisTest = true;
+ } else {
+ runThisTest = false;
+ }
+ }
+ if (runThisTest) {
+ if ( (jepInstalled) && (!BaseJEPTest.JEP_INITIALIZED)) {
+ try {
+ BaseJEPTest.initJEPThread();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return base;
+ } else {
+ // bypass the test altogether
+ return new Statement()
+ {
+ @Override
+ public void evaluate() throws Throwable
+ {
+ // Return an empty Statement object for those tests
+ }
+ };
+ }
+ }
+
+}
diff --git a/python/src/test/resources/META-INF/properties.xml b/python/src/test/resources/META-INF/properties.xml
new file mode 100644
index 0000000..652f981
--- /dev/null
+++ b/python/src/test/resources/META-INF/properties.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<configuration>
+ <property>
+ <name>dt.operator.pythonprocessor.attr.JVM_OPTIONS</name>
+ <value> -Djava.library.path=${jepInstallPath}</value>
+ </property>
+</configuration>
diff --git a/python/src/test/resources/factorial.py b/python/src/test/resources/factorial.py
new file mode 100755
index 0000000..ca471b8
--- /dev/null
+++ b/python/src/test/resources/factorial.py
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+
+import sys, getopt, os
+
+os.getcwd()
+outputfile = 'target/factorial-result.txt'
+number = 5
+
+
+def factorial(num):
+ if num <= 0:
+ return 1
+ else:
+ factorial = 1
+ for i in range(1,num + 1):
+ factorial = factorial*i
+ return factorial
+
+def writeFile(factorial):
+ fileHandle = open(outputfile,'w')
+ fileHandle.write(factorial+'\n')
+ fileHandle.flush()
+ fileHandle.close()
+
+
+if __name__ == "__main__":
+ writeFile(str(factorial(number)))
+
+
diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ecb3668
--- /dev/null
+++ b/python/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.apex.malhar.python=INFO
+log4j.logger.org.apache.apex.malhar.python.base=INFO
+log4j.logger.org.apache.apex.malhar.python.base.jep=INFO
--
To stop receiving notification emails like this one, please contact
thw@apache.org.